diff --git a/routes/morse.py b/routes/morse.py index 42c1818..9188637 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -219,6 +219,14 @@ def _validate_signal_gate(value: Any) -> float: raise ValueError(f'Invalid signal gate: {value}') from e +def _validate_detect_mode(value: Any) -> str: + """Validate detection mode ('goertzel' or 'envelope').""" + mode = str(value or 'goertzel').lower().strip() + if mode not in ('goertzel', 'envelope'): + raise ValueError("detect_mode must be 'goertzel' or 'envelope'") + return mode + + def _snapshot_live_resources() -> list[str]: alive: list[str] = [] if morse_decoder_worker and morse_decoder_worker.is_alive(): @@ -238,8 +246,15 @@ def start_morse() -> Response: data = request.json or {} + # Validate detect_mode first — it determines frequency limits. try: - freq = validate_frequency(data.get('frequency', '14.060'), min_mhz=0.5, max_mhz=30.0) + detect_mode = _validate_detect_mode(data.get('detect_mode', 'goertzel')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 + + freq_max = 1766.0 if detect_mode == 'envelope' else 30.0 + try: + freq = validate_frequency(data.get('frequency', '14.060'), min_mhz=0.5, max_mhz=freq_max) gain = validate_gain(data.get('gain', '0')) ppm = validate_ppm(data.get('ppm', '0')) device = validate_device_index(data.get('device', '0')) @@ -289,7 +304,15 @@ def start_morse() -> Response: _drain_queue(app_module.morse_queue) _set_state(MORSE_STARTING, 'Starting decoder...') - sample_rate = 22050 + # Envelope mode (OOK/AM): use AM demod, higher sample rate for better + # envelope resolution. Goertzel mode (HF CW): use USB demod. + if detect_mode == 'envelope': + sample_rate = 48000 + modulation = 'am' + else: + sample_rate = 22050 + modulation = 'usb' + bias_t = _bool_value(data.get('bias_t', False), False) try: @@ -322,7 +345,11 @@ def start_morse() -> Response: return f'device {device_index} ({name}, SN: {serial})' def _build_rtl_cmd(device_index: int, direct_sampling_mode: int | None) -> list[str]: - tuned_frequency_mhz = max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)) + # Envelope mode tunes directly to center freq (no tone offset). + if detect_mode == 'envelope': + tuned_frequency_mhz = max(0.5, float(freq)) + else: + tuned_frequency_mhz = max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)) sdr_device = SDRFactory.create_default_device(sdr_type, index=device_index) fm_kwargs: dict[str, Any] = { 'device': sdr_device, @@ -330,7 +357,7 @@ def start_morse() -> Response: 'sample_rate': sample_rate, 'gain': float(gain) if gain and gain != '0' else None, 'ppm': int(ppm) if ppm and ppm != '0' else None, - 'modulation': 'usb', + 'modulation': modulation, 'bias_t': bias_t, } if direct_sampling_mode in (1, 2): @@ -342,13 +369,19 @@ def start_morse() -> Response: cmd.append('-') return cmd - can_try_direct_sampling = bool(sdr_type == SDRType.RTL_SDR and float(freq) < 24.0) + can_try_direct_sampling = bool( + sdr_type == SDRType.RTL_SDR + and detect_mode != 'envelope' # direct sampling is HF-only + and float(freq) < 24.0 + ) direct_sampling_attempts: list[int | None] = [2, 1, None] if can_try_direct_sampling else [None] runtime_config: dict[str, Any] = { 'sample_rate': sample_rate, + 'detect_mode': detect_mode, + 'modulation': modulation, 'rf_frequency_mhz': float(freq), - 'tuned_frequency_mhz': max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)), + 'tuned_frequency_mhz': max(0.5, float(freq)) if detect_mode == 'envelope' else max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)), 'tone_freq': tone_freq, 'wpm': wpm, 'bandwidth_hz': bandwidth_hz, @@ -663,6 +696,8 @@ def start_morse() -> Response: 'status': 'started', 'state': MORSE_RUNNING, 'command': full_cmd, + 'detect_mode': detect_mode, + 'modulation': modulation, 'tone_freq': tone_freq, 'wpm': wpm, 'config': runtime_config, diff --git a/static/js/modes/morse.js b/static/js/modes/morse.js index 94d191e..320ea5e 100644 --- a/static/js/modes/morse.js +++ b/static/js/modes/morse.js @@ -103,6 +103,7 @@ var MorseMode = (function () { device: (el('deviceSelect') && el('deviceSelect').value) || '0', sdr_type: (el('sdrTypeSelect') && el('sdrTypeSelect').value) || 'rtlsdr', bias_t: (typeof getBiasTEnabled === 'function') ? getBiasTEnabled() : false, + detect_mode: (el('morseDetectMode') && el('morseDetectMode').value) || 'goertzel', tone_freq: (el('morseToneFreq') && el('morseToneFreq').value) || '700', bandwidth_hz: (el('morseBandwidth') && el('morseBandwidth').value) || '200', auto_tone_track: !!(el('morseAutoToneTrack') && el('morseAutoToneTrack').checked), @@ -124,6 +125,7 @@ var MorseMode = (function () { frequency: (el('morseFrequency') && el('morseFrequency').value) || '14.060', gain: (el('morseGain') && el('morseGain').value) || '40', ppm: (el('morsePPM') && el('morsePPM').value) || '0', + detect_mode: (el('morseDetectMode') && el('morseDetectMode').value) || 'goertzel', tone_freq: (el('morseToneFreq') && el('morseToneFreq').value) || '700', bandwidth_hz: (el('morseBandwidth') && el('morseBandwidth').value) || '200', auto_tone_track: !!(el('morseAutoToneTrack') && el('morseAutoToneTrack').checked), @@ -167,6 +169,9 @@ var MorseMode = (function () { if (el('morseShowRaw') && settings.show_raw !== undefined) el('morseShowRaw').checked = !!settings.show_raw; if (el('morseShowDiag') && settings.show_diag !== undefined) el('morseShowDiag').checked = !!settings.show_diag; + if (settings.detect_mode) { + setDetectMode(settings.detect_mode); + } updateToneLabel((el('morseToneFreq') && el('morseToneFreq').value) || '700'); updateWpmLabel((el('morseWpm') && el('morseWpm').value) || '15'); onThresholdModeChange(); @@ -198,10 +203,11 @@ var MorseMode = (function () { state.controlsBound = true; var ids = [ - 'morseFrequency', 'morseGain', 'morsePPM', 'morseToneFreq', 'morseBandwidth', - 'morseAutoToneTrack', 'morseToneLock', 'morseThresholdMode', 'morseManualThreshold', - 'morseThresholdMultiplier', 'morseThresholdOffset', 'morseSignalGate', - 'morseWpmMode', 'morseWpm', 'morseWpmLock', 'morseShowRaw', 'morseShowDiag' + 'morseFrequency', 'morseGain', 'morsePPM', 'morseDetectMode', 'morseToneFreq', + 'morseBandwidth', 'morseAutoToneTrack', 'morseToneLock', 'morseThresholdMode', + 'morseManualThreshold', 'morseThresholdMultiplier', 'morseThresholdOffset', + 'morseSignalGate', 'morseWpmMode', 'morseWpm', 'morseWpmLock', + 'morseShowRaw', 'morseShowDiag' ]; ids.forEach(function (id) { @@ -1199,12 +1205,80 @@ var MorseMode = (function () { }); } + function setDetectMode(mode) { + var hidden = el('morseDetectMode'); + if (hidden) hidden.value = mode; + + // Update toggle button styles + var btnGoertzel = el('morseDetectGoertzel'); + var btnEnvelope = el('morseDetectEnvelope'); + if (btnGoertzel && btnEnvelope) { + if (mode === 'envelope') { + btnEnvelope.style.background = 'var(--accent)'; + btnEnvelope.style.color = '#000'; + btnGoertzel.style.background = ''; + btnGoertzel.style.color = ''; + } else { + btnGoertzel.style.background = 'var(--accent)'; + btnGoertzel.style.color = '#000'; + btnEnvelope.style.background = ''; + btnEnvelope.style.color = ''; + } + } + + // Toggle preset groups + var hfPresets = el('morseHFPresets'); + var ismPresets = el('morseISMPresets'); + if (hfPresets) hfPresets.style.display = mode === 'envelope' ? 'none' : 'flex'; + if (ismPresets) ismPresets.style.display = mode === 'envelope' ? 'flex' : 'none'; + + // Toggle CW detector section (tone freq, bandwidth, tone track -- not needed for envelope) + var toneGroup = el('morseToneFreqGroup'); + if (toneGroup) toneGroup.style.display = mode === 'envelope' ? 'none' : ''; + + // Toggle antenna notes + var hfNote = el('morseHFNote'); + var envNote = el('morseEnvelopeNote'); + if (hfNote) hfNote.style.display = mode === 'envelope' ? 'none' : ''; + if (envNote) envNote.style.display = mode === 'envelope' ? '' : 'none'; + + // Update hint text + var hint = el('morseDetectHint'); + if (hint) { + hint.textContent = mode === 'envelope' + ? 'OOK Envelope: AM demod, RMS detection. For ISM-band OOK/CW.' + : 'CW Tone: HF bands, USB demod, Goertzel filter. For amateur CW.'; + } + + // Set sensible default frequency when switching modes + var freqEl = el('morseFrequency'); + if (freqEl) { + var curFreq = parseFloat(freqEl.value); + if (mode === 'envelope' && curFreq < 30) { + freqEl.value = '433.300'; + } else if (mode === 'goertzel' && curFreq > 30) { + freqEl.value = '14.060'; + } + } + + // Set WPM default for envelope mode (OOK transmitters tend to be slower) + var wpmEl = el('morseWpm'); + var wpmLabel = el('morseWpmLabel'); + if (mode === 'envelope' && wpmEl) { + wpmEl.value = '12'; + if (wpmLabel) wpmLabel.textContent = '12'; + } + + persistSettings(); + } + return { init: init, destroy: destroy, start: start, stop: stop, setFreq: setFreq, + setDetectMode: setDetectMode, exportTxt: exportTxt, exportCsv: exportCsv, copyToClipboard: copyToClipboard, diff --git a/templates/partials/modes/morse.html b/templates/partials/modes/morse.html index 84f5042..9310440 100644 --- a/templates/partials/modes/morse.html +++ b/templates/partials/modes/morse.html @@ -3,21 +3,39 @@

CW/Morse Decoder

- Decode CW (continuous wave) Morse with USB demod + Goertzel tone detection. - Start with 700 Hz tone and 200 Hz bandwidth. + Decode CW (continuous wave) Morse code. Supports HF amateur bands (USB + Goertzel tone + detection) and ISM/UHF OOK signals (AM + envelope detection).

+
+

Detection Mode

+
+
+ + +
+ +

+ CW Tone: HF bands, USB demod, Goertzel filter. For amateur CW. +

+
+
+

Frequency

- + Enter CW center frequency in MHz (e.g., 7.030 for 40m).
-
+
@@ -27,6 +45,13 @@
+
@@ -42,7 +67,7 @@
-
+

CW Detector

@@ -154,12 +179,18 @@
-
+

CW on HF (1-30 MHz) requires an HF-capable SDR path (direct sampling or upconverter) and an appropriate antenna.

+ diff --git a/tests/test_morse.py b/tests/test_morse.py index 22fafc0..30449d5 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -12,20 +12,18 @@ import time import wave from collections import Counter -import pytest - import app as app_module import routes.morse as morse_routes from utils.morse import ( CHAR_TO_MORSE, MORSE_TABLE, + EnvelopeDetector, GoertzelFilter, MorseDecoder, decode_morse_wav_file, morse_decoder_thread, ) - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -133,6 +131,93 @@ class TestToneDetector: assert gf.magnitude(on_tone) > gf.magnitude(off_tone) * 3.0 +class TestEnvelopeDetector: + def test_magnitude_of_silence_is_near_zero(self): + det = EnvelopeDetector(block_size=160) + silence = [0.0] * 160 + assert det.magnitude(silence) < 1e-6 + + def test_magnitude_of_constant_amplitude(self): + det = EnvelopeDetector(block_size=160) + loud = [0.8] * 160 + mag = det.magnitude(loud) + assert abs(mag - 0.8) < 0.01 + + def test_magnitude_of_sine_wave(self): + det = EnvelopeDetector(block_size=160) + samples = [0.5 * math.sin(2 * math.pi * 700 * i / 8000.0) for i in range(160)] + mag = det.magnitude(samples) + # RMS of a sine at amplitude 0.5 is 0.5/sqrt(2) ~ 0.354 + assert 0.30 < mag < 0.40 + + def test_magnitude_with_numpy_array(self): + import numpy as np + det = EnvelopeDetector(block_size=100) + arr = np.ones(100, dtype=np.float64) * 0.6 + assert abs(det.magnitude(arr) - 0.6) < 0.01 + + def test_empty_samples_returns_zero(self): + det = EnvelopeDetector(block_size=0) + assert det.magnitude([]) == 0.0 + + +class TestEnvelopeMorseDecoder: + def test_envelope_decoder_detects_ook_elements(self): + """Verify envelope mode can distinguish on/off keying.""" + sample_rate = 48000 + wpm = 15 + dit_dur = 1.2 / wpm + + def ook_on(duration): + n = int(sample_rate * duration) + return struct.pack(f'<{n}h', *([int(0.7 * 32767)] * n)) + + def ook_off(duration): + n = int(sample_rate * duration) + return b'\x00\x00' * n + + # Generate dit-dah (A = .-) + audio = ( + ook_off(0.3) + + ook_on(dit_dur) + + ook_off(dit_dur) + + ook_on(3 * dit_dur) + + ook_off(0.5) + ) + + decoder = MorseDecoder( + sample_rate=sample_rate, + tone_freq=700.0, + wpm=wpm, + detect_mode='envelope', + ) + events = decoder.process_block(audio) + events.extend(decoder.flush()) + elements = [e['element'] for e in events if e.get('type') == 'morse_element'] + + assert '.' in elements + assert '-' in elements + + def test_envelope_metrics_have_zero_snr(self): + """Envelope mode metrics should report zero SNR fields.""" + decoder = MorseDecoder( + sample_rate=8000, + detect_mode='envelope', + ) + metrics = decoder.get_metrics() + assert metrics['detect_mode'] == 'envelope' + assert metrics['snr'] == 0.0 + assert metrics['noise_ref'] == 0.0 + + def test_goertzel_mode_unchanged(self): + """Default goertzel mode still works as before.""" + decoder = MorseDecoder(sample_rate=8000, wpm=15) + assert decoder.detect_mode == 'goertzel' + metrics = decoder.get_metrics() + assert 'detect_mode' in metrics + assert metrics['detect_mode'] == 'goertzel' + + class TestTimingAndWpmEstimator: def test_timing_classifier_distinguishes_dit_and_dah(self): decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) diff --git a/utils/morse.py b/utils/morse.py index a0be082..3849a77 100644 --- a/utils/morse.py +++ b/utils/morse.py @@ -1,10 +1,12 @@ -"""Morse code (CW) decoding helpers. +"""Morse code (CW) decoding helpers with dual detection modes. -Signal chain: -- SDR audio from `rtl_fm -M usb` (16-bit LE PCM) -- Goertzel tone detection with optional auto-tone tracking -- Adaptive threshold + hysteresis + minimum signal gate -- Timing estimator (auto/manual WPM) and Morse symbol decoding +Supports two signal chains: + goertzel: rtl_fm -M usb -> raw PCM -> Goertzel tone filter -> timing state machine -> characters + envelope: rtl_fm -M am -> raw PCM -> RMS envelope -> timing state machine -> characters + +Goertzel mode is the original path for HF CW (beat note detection). +Envelope mode adds support for OOK/AM signals (e.g. 433 MHz carrier keying) +where AM demod already produces a baseband envelope -- no tone to detect. """ from __future__ import annotations @@ -80,6 +82,25 @@ class GoertzelFilter: return math.sqrt(max(power, 0.0)) +class EnvelopeDetector: + """RMS envelope detector for AM-demodulated OOK signals. + + When rtl_fm uses -M am, carrier-on produces a high amplitude envelope + and carrier-off produces near-silence. RMS over a short block gives + a clean on/off metric without needing a specific tone frequency. + """ + + def __init__(self, block_size: int): + self.block_size = block_size + + def magnitude(self, samples: list[float] | tuple[float, ...] | np.ndarray) -> float: + """Compute RMS magnitude of the sample block.""" + arr = np.asarray(samples, dtype=np.float64) + if arr.size == 0: + return 0.0 + return float(np.sqrt(np.mean(np.square(arr)))) + + def _goertzel_mag(samples: np.ndarray, target_freq: float, sample_rate: int) -> float: """Compute Goertzel magnitude, preferring shared DSP helper.""" if _shared_goertzel_mag is not None: @@ -137,10 +158,12 @@ class MorseDecoder: wpm_mode: str = 'auto', wpm_lock: bool = False, min_signal_gate: float = 0.0, + detect_mode: str = 'goertzel', ): self.sample_rate = int(sample_rate) self.tone_freq = float(tone_freq) self.wpm = int(wpm) + self.detect_mode = detect_mode if detect_mode in ('goertzel', 'envelope') else 'goertzel' self.bandwidth_hz = int(_clamp(float(bandwidth_hz), 50, 400)) self.auto_tone_track = bool(auto_tone_track) @@ -163,17 +186,22 @@ class MorseDecoder: self._tone_scan_step_hz = 10.0 self._tone_scan_interval_blocks = 8 - self._detector = GoertzelFilter(self._active_tone_freq, self.sample_rate, self._block_size) - self._noise_detector_low = GoertzelFilter( - _clamp(self._active_tone_freq - max(150.0, self.bandwidth_hz), 150.0, 2000.0), - self.sample_rate, - self._block_size, - ) - self._noise_detector_high = GoertzelFilter( - _clamp(self._active_tone_freq + max(150.0, self.bandwidth_hz), 150.0, 2000.0), - self.sample_rate, - self._block_size, - ) + if self.detect_mode == 'envelope': + self._detector = EnvelopeDetector(self._block_size) + self._noise_detector_low = None + self._noise_detector_high = None + else: + self._detector = GoertzelFilter(self._active_tone_freq, self.sample_rate, self._block_size) + self._noise_detector_low = GoertzelFilter( + _clamp(self._active_tone_freq - max(150.0, self.bandwidth_hz), 150.0, 2000.0), + self.sample_rate, + self._block_size, + ) + self._noise_detector_high = GoertzelFilter( + _clamp(self._active_tone_freq + max(150.0, self.bandwidth_hz), 150.0, 2000.0), + self.sample_rate, + self._block_size, + ) # AGC for weak HF/direct-sampling signals. self._agc_target = 0.22 @@ -181,8 +209,14 @@ class MorseDecoder: self._agc_alpha = 0.06 # Envelope smoothing. - self._attack_alpha = 0.55 - self._release_alpha = 0.45 + # OOK has clean binary transitions; use symmetric fast alpha. + # HF CW has gradual fading (QSB); use asymmetric slower release. + if self.detect_mode == 'envelope': + self._attack_alpha = 0.55 + self._release_alpha = 0.55 + else: + self._attack_alpha = 0.55 + self._release_alpha = 0.45 self._envelope = 0.0 # Adaptive threshold model. @@ -203,8 +237,13 @@ class MorseDecoder: dit_blocks = max(1.0, dit_sec / self._block_duration) self._dah_threshold = 2.2 * dit_blocks self._dit_min = 0.38 * dit_blocks - self._char_gap = 2.6 * dit_blocks - self._word_gap = 6.0 * dit_blocks + if self.detect_mode == 'envelope': + # Tighter gaps for OOK — clean binary transitions tolerate this. + self._char_gap = 2.0 * dit_blocks + self._word_gap = 5.0 * dit_blocks + else: + self._char_gap = 2.6 * dit_blocks + self._word_gap = 6.0 * dit_blocks self._dit_observations: deque[float] = deque(maxlen=32) self._estimated_wpm = float(self.wpm) @@ -236,10 +275,7 @@ class MorseDecoder: def get_metrics(self) -> dict[str, float | bool]: """Return latest decoder metrics for UI/status messages.""" - snr_mult = max(1.15, self.threshold_multiplier * 0.5) - snr_on = snr_mult * (1.0 + self._hysteresis) - snr_off = snr_mult * (1.0 - self._hysteresis) - return { + metrics: dict[str, Any] = { 'wpm': float(self._estimated_wpm), 'tone_freq': float(self._active_tone_freq), 'level': float(self._last_level), @@ -247,14 +283,27 @@ class MorseDecoder: 'threshold': float(self._threshold), 'tone_on': bool(self._tone_on), 'dit_ms': float((self._effective_dit_blocks() * self._block_duration) * 1000.0), - 'snr': float(self._last_level / max(self._noise_floor, 1e-6)), - 'noise_ref': float(self._noise_floor), - 'snr_on': float(snr_on), - 'snr_off': float(snr_off), + 'detect_mode': self.detect_mode, } + if self.detect_mode == 'envelope': + metrics['snr'] = 0.0 + metrics['noise_ref'] = 0.0 + metrics['snr_on'] = 0.0 + metrics['snr_off'] = 0.0 + else: + snr_mult = max(1.15, self.threshold_multiplier * 0.5) + snr_on = snr_mult * (1.0 + self._hysteresis) + snr_off = snr_mult * (1.0 - self._hysteresis) + metrics['snr'] = float(self._last_level / max(self._noise_floor, 1e-6)) + metrics['noise_ref'] = float(self._noise_floor) + metrics['snr_on'] = float(snr_on) + metrics['snr_off'] = float(snr_off) + return metrics def _rebuild_detectors(self) -> None: """Rebuild target/noise Goertzel filters after tone updates.""" + if self.detect_mode == 'envelope': + return # Envelope detector is frequency-agnostic self._detector = GoertzelFilter(self._active_tone_freq, self.sample_rate, self._block_size) ref_offset = max(150.0, self.bandwidth_hz) self._noise_detector_low = GoertzelFilter( @@ -391,93 +440,143 @@ class MorseDecoder: self._blocks_processed += 1 mag = self._detector.magnitude(normalized) - noise_low = self._noise_detector_low.magnitude(normalized) - noise_high = self._noise_detector_high.magnitude(normalized) - noise_ref = max(1e-9, (noise_low + noise_high) * 0.5) - if ( - self.auto_tone_track - and not self.tone_lock - and self._blocks_processed > self._WARMUP_BLOCKS - and (self._blocks_processed % self._tone_scan_interval_blocks == 0) - and self._estimate_tone_frequency(normalized, mag, noise_ref) - ): - # Detector changed; refresh magnitudes for this window. - mag = self._detector.magnitude(normalized) + if self.detect_mode == 'envelope': + # Envelope mode: direct magnitude threshold, no noise detectors + noise_ref = 0.0 + level = float(mag) + alpha = self._attack_alpha if level >= self._envelope else self._release_alpha + self._envelope += alpha * (level - self._envelope) + self._last_level = self._envelope + self._last_noise_ref = 0.0 + amplitudes.append(level) + + if self._blocks_processed <= self._WARMUP_BLOCKS: + self._mag_min = min(self._mag_min, level) + self._mag_max = max(self._mag_max, level) + if self._blocks_processed == self._WARMUP_BLOCKS: + self._noise_floor = self._mag_min if math.isfinite(self._mag_min) else 0.0 + if self._mag_max <= (self._noise_floor * 1.2): + self._signal_peak = max(self._noise_floor + 0.5, self._noise_floor * 2.5) + else: + self._signal_peak = max(self._mag_max, self._noise_floor * 1.8) + self._threshold = self._noise_floor + 0.22 * ( + self._signal_peak - self._noise_floor + ) + tone_detected = False + else: + settle_alpha = 0.30 if self._blocks_processed < (self._WARMUP_BLOCKS + self._SETTLE_BLOCKS) else 0.06 + if level <= self._threshold: + self._noise_floor += settle_alpha * (level - self._noise_floor) + else: + self._signal_peak += settle_alpha * (level - self._signal_peak) + self._signal_peak = max(self._signal_peak, self._noise_floor * 1.05) + + if self.threshold_mode == 'manual': + self._threshold = max(0.0, self.manual_threshold) + else: + self._threshold = ( + max(0.0, self._noise_floor * self.threshold_multiplier) + + self.threshold_offset + ) + self._threshold = max(self._threshold, self._noise_floor + 0.35) + + dynamic_span = max(0.0, self._signal_peak - self._noise_floor) + gate_level = self._noise_floor + (self.min_signal_gate * dynamic_span) + gate_ok = self.min_signal_gate <= 0.0 or level >= gate_level + + # Direct magnitude threshold with hysteresis (no SNR) + if self._tone_on: + tone_detected = gate_ok and level >= (self._threshold * (1.0 - self._hysteresis)) + else: + tone_detected = gate_ok and level >= (self._threshold * (1.0 + self._hysteresis)) + else: + # Goertzel mode: SNR-based tone detection with noise reference noise_low = self._noise_detector_low.magnitude(normalized) noise_high = self._noise_detector_high.magnitude(normalized) noise_ref = max(1e-9, (noise_low + noise_high) * 0.5) - level = float(mag) - alpha = self._attack_alpha if level >= self._envelope else self._release_alpha - self._envelope += alpha * (level - self._envelope) - self._last_level = self._envelope - self._last_noise_ref = noise_ref - amplitudes.append(level) + if ( + self.auto_tone_track + and not self.tone_lock + and self._blocks_processed > self._WARMUP_BLOCKS + and (self._blocks_processed % self._tone_scan_interval_blocks == 0) + and self._estimate_tone_frequency(normalized, mag, noise_ref) + ): + # Detector changed; refresh magnitudes for this window. + mag = self._detector.magnitude(normalized) + noise_low = self._noise_detector_low.magnitude(normalized) + noise_high = self._noise_detector_high.magnitude(normalized) + noise_ref = max(1e-9, (noise_low + noise_high) * 0.5) - if self._blocks_processed <= self._WARMUP_BLOCKS: - self._mag_min = min(self._mag_min, level) - self._mag_max = max(self._mag_max, level) - if self._blocks_processed == self._WARMUP_BLOCKS: - self._noise_floor = self._mag_min if math.isfinite(self._mag_min) else 0.0 - if self._mag_max <= (self._noise_floor * 1.2): - self._signal_peak = max(self._noise_floor + 0.5, self._noise_floor * 2.5) + level = float(mag) + alpha = self._attack_alpha if level >= self._envelope else self._release_alpha + self._envelope += alpha * (level - self._envelope) + self._last_level = self._envelope + self._last_noise_ref = noise_ref + amplitudes.append(level) + + if self._blocks_processed <= self._WARMUP_BLOCKS: + self._mag_min = min(self._mag_min, level) + self._mag_max = max(self._mag_max, level) + if self._blocks_processed == self._WARMUP_BLOCKS: + self._noise_floor = self._mag_min if math.isfinite(self._mag_min) else 0.0 + if self._mag_max <= (self._noise_floor * 1.2): + self._signal_peak = max(self._noise_floor + 0.5, self._noise_floor * 2.5) + else: + self._signal_peak = max(self._mag_max, self._noise_floor * 1.8) + self._threshold = self._noise_floor + 0.22 * ( + self._signal_peak - self._noise_floor + ) + tone_detected = False + else: + settle_alpha = 0.30 if self._blocks_processed < (self._WARMUP_BLOCKS + self._SETTLE_BLOCKS) else 0.06 + + detector_level = level + + if detector_level <= self._threshold: + self._noise_floor += settle_alpha * (detector_level - self._noise_floor) else: - self._signal_peak = max(self._mag_max, self._noise_floor * 1.8) - self._threshold = self._noise_floor + 0.22 * ( - self._signal_peak - self._noise_floor - ) - tone_detected = False - else: - settle_alpha = 0.30 if self._blocks_processed < (self._WARMUP_BLOCKS + self._SETTLE_BLOCKS) else 0.06 + self._signal_peak += settle_alpha * (detector_level - self._signal_peak) - detector_level = level + self._signal_peak = max(self._signal_peak, self._noise_floor * 1.05) - if detector_level <= self._threshold: - self._noise_floor += settle_alpha * (detector_level - self._noise_floor) - else: - self._signal_peak += settle_alpha * (detector_level - self._signal_peak) + # Blend adjacent-band noise reference into noise floor. + self._noise_floor += (settle_alpha * 0.25) * (noise_ref - self._noise_floor) - self._signal_peak = max(self._signal_peak, self._noise_floor * 1.05) + if self.threshold_mode == 'manual': + self._threshold = max(0.0, self.manual_threshold) + else: + self._threshold = ( + max(0.0, self._noise_floor * self.threshold_multiplier) + + self.threshold_offset + ) + self._threshold = max(self._threshold, self._noise_floor + 0.35) - # Always blend adjacent-band noise reference into noise floor. - # Adjacent bands track the same AGC gain but exclude the tone, - # so this prevents noise floor from staying stuck at warmup-era - # low values after AGC converges. - self._noise_floor += (settle_alpha * 0.25) * (noise_ref - self._noise_floor) + dynamic_span = max(0.0, self._signal_peak - self._noise_floor) + gate_level = self._noise_floor + (self.min_signal_gate * dynamic_span) + gate_ok = self.min_signal_gate <= 0.0 or detector_level >= gate_level - if self.threshold_mode == 'manual': - self._threshold = max(0.0, self.manual_threshold) - else: - self._threshold = ( - max(0.0, self._noise_floor * self.threshold_multiplier) - + self.threshold_offset - ) - self._threshold = max(self._threshold, self._noise_floor + 0.35) + # SNR-based tone detection (gain-invariant). + snr = level / max(noise_ref, 1e-6) + snr_mult = max(1.15, self.threshold_multiplier * 0.5) + snr_on = snr_mult * (1.0 + self._hysteresis) + snr_off = snr_mult * (1.0 - self._hysteresis) - dynamic_span = max(0.0, self._signal_peak - self._noise_floor) - gate_level = self._noise_floor + (self.min_signal_gate * dynamic_span) - gate_ok = self.min_signal_gate <= 0.0 or detector_level >= gate_level - - # Use SNR (tone mag / adjacent-band noise) for tone detection. - # Both bands are equally amplified by AGC, so the ratio is - # gain-invariant — fixes stuck-ON tone when AGC amplifies - # inter-element silence above the raw magnitude threshold. - snr = level / max(noise_ref, 1e-6) - snr_mult = max(1.15, self.threshold_multiplier * 0.5) - snr_on = snr_mult * (1.0 + self._hysteresis) - snr_off = snr_mult * (1.0 - self._hysteresis) - - if self._tone_on: - tone_detected = gate_ok and snr >= snr_off - else: - tone_detected = gate_ok and snr >= snr_on + if self._tone_on: + tone_detected = gate_ok and snr >= snr_off + else: + tone_detected = gate_ok and snr >= snr_on dit_blocks = self._effective_dit_blocks() self._dah_threshold = 2.2 * dit_blocks self._dit_min = max(1.0, 0.38 * dit_blocks) - self._char_gap = 2.6 * dit_blocks - self._word_gap = 6.0 * dit_blocks + if self.detect_mode == 'envelope': + self._char_gap = 2.0 * dit_blocks + self._word_gap = 5.0 * dit_blocks + else: + self._char_gap = 2.6 * dit_blocks + self._word_gap = 6.0 * dit_blocks if tone_detected and not self._tone_on: # Tone edge up. @@ -548,10 +647,7 @@ class MorseDecoder: self._silence_blocks += 1.0 if amplitudes: - snr_mult = max(1.15, self.threshold_multiplier * 0.5) - snr_on = snr_mult * (1.0 + self._hysteresis) - snr_off = snr_mult * (1.0 - self._hysteresis) - events.append({ + scope_event: dict[str, Any] = { 'type': 'scope', 'amplitudes': amplitudes, 'threshold': self._threshold, @@ -561,11 +657,22 @@ class MorseDecoder: 'noise_floor': self._noise_floor, 'wpm': round(self._estimated_wpm, 1), 'dit_ms': round(self._effective_dit_blocks() * self._block_duration * 1000.0, 1), - 'snr': round(self._last_level / max(self._noise_floor, 1e-6), 2), - 'noise_ref': round(self._noise_floor, 4), - 'snr_on': round(snr_on, 2), - 'snr_off': round(snr_off, 2), - }) + 'detect_mode': self.detect_mode, + } + if self.detect_mode == 'envelope': + scope_event['snr'] = 0.0 + scope_event['noise_ref'] = 0.0 + scope_event['snr_on'] = 0.0 + scope_event['snr_off'] = 0.0 + else: + snr_mult = max(1.15, self.threshold_multiplier * 0.5) + snr_on = snr_mult * (1.0 + self._hysteresis) + snr_off = snr_mult * (1.0 - self._hysteresis) + scope_event['snr'] = round(self._last_level / max(self._noise_floor, 1e-6), 2) + scope_event['noise_ref'] = round(self._noise_floor, 4) + scope_event['snr_on'] = round(snr_on, 2) + scope_event['snr_off'] = round(snr_off, 2) + events.append(scope_event) return events @@ -818,6 +925,7 @@ def morse_decoder_thread( wpm_mode=_normalize_wpm_mode(cfg.get('wpm_mode', 'auto')), wpm_lock=_coerce_bool(cfg.get('wpm_lock', False), False), min_signal_gate=float(cfg.get('min_signal_gate', 0.0) or 0.0), + detect_mode=str(cfg.get('detect_mode', 'goertzel')), ) last_scope = time.monotonic() @@ -1101,6 +1209,7 @@ def morse_iq_decoder_thread( wpm_mode=_normalize_wpm_mode(cfg.get('wpm_mode', 'auto')), wpm_lock=_coerce_bool(cfg.get('wpm_lock', False), False), min_signal_gate=float(cfg.get('min_signal_gate', 0.0) or 0.0), + detect_mode=str(cfg.get('detect_mode', 'goertzel')), ) last_scope = time.monotonic()