diff --git a/tests/test_morse.py b/tests/test_morse.py index bb6da32..da926f9 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -147,18 +147,17 @@ class TestGoertzelFilter: class TestMorseDecoder: def _make_decoder(self, wpm=15): - """Create decoder with pre-warmed threshold for testing.""" + """Create decoder with warm-up phase completed for testing. + + Feeds silence then tone then silence to get past the warm-up + blocks and establish a valid noise floor / signal peak. + """ decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=wpm) - # Warm up noise floor with silence - silence = generate_silence(0.5) - decoder.process_block(silence) - # Warm up signal peak with tone - tone = generate_tone(700.0, 0.3) - decoder.process_block(tone) - # More silence to settle - silence2 = generate_silence(0.5) - decoder.process_block(silence2) - # Reset state after warm-up + # Feed enough audio to get past warm-up (50 blocks = 1 sec) + # Mix silence and tone so warm-up sees both noise and signal + warmup_audio = generate_silence(0.6) + generate_tone(700.0, 0.4) + generate_silence(0.5) + decoder.process_block(warmup_audio) + # Reset state machine after warm-up so tests start clean decoder._tone_on = False decoder._current_symbol = '' decoder._tone_blocks = 0 @@ -246,14 +245,14 @@ class TestMorseDecoder: assert 'tone_on' in se def test_adaptive_threshold_adjusts(self): - """After processing audio, threshold should be non-zero.""" + """After processing enough audio to complete warm-up, threshold should be non-zero.""" decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) - # Process some tone + silence - audio = generate_tone(700.0, 0.3) + generate_silence(0.3) + # Feed enough audio to complete the 50-block warm-up (~1 second) + audio = generate_silence(0.6) + generate_tone(700.0, 0.4) + generate_silence(0.3) decoder.process_block(audio) - assert decoder._threshold > 0, "Threshold should adapt above zero" + assert decoder._threshold > 0, "Threshold should adapt above zero after warm-up" def test_flush_emits_pending_char(self): """flush() should emit any accumulated but not-yet-decoded symbol.""" @@ -269,6 +268,39 @@ class TestMorseDecoder: events = decoder.flush() assert events == [] + def test_weak_signal_detection(self): + """CW tone at only 3x noise magnitude should still decode characters.""" + decoder = self._make_decoder(wpm=10) + # Generate weak CW audio (low amplitude simulating weak HF signal) + audio = generate_morse_audio('SOS', wpm=10, sample_rate=8000) + # Scale to low amplitude (simulating weak signal) + n_samples = len(audio) // 2 + samples = struct.unpack(f'<{n_samples}h', audio) + # Reduce to ~10% amplitude + weak_samples = [max(-32768, min(32767, int(s * 0.1))) for s in samples] + weak_audio = struct.pack(f'<{len(weak_samples)}h', *weak_samples) + + events = decoder.process_block(weak_audio) + events.extend(decoder.flush()) + + chars = [e for e in events if e['type'] == 'morse_char'] + decoded = ''.join(e['char'] for e in chars) + # Should decode at least some characters from the weak signal + assert len(chars) >= 1, f"Expected decoded chars from weak signal, got '{decoded}'" + + def test_agc_boosts_quiet_signal(self): + """Very quiet PCM (amplitude 0.01) should still produce usable Goertzel magnitudes.""" + decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) + # Generate very quiet tone + quiet_tone = generate_tone(700.0, 1.5, amplitude=0.01) # 1.5s of very quiet CW + events = decoder.process_block(quiet_tone) + + scope_events = [e for e in events if e['type'] == 'scope'] + assert len(scope_events) > 0, "Expected scope events from quiet signal" + # AGC should have boosted the signal — amplitudes should be visible + max_amp = max(max(se['amplitudes']) for se in scope_events) + assert max_amp > 1.0, f"AGC should boost quiet signal to usable magnitude, got {max_amp}" + # --------------------------------------------------------------------------- # morse_decoder_thread tests diff --git a/utils/morse.py b/utils/morse.py index 1a957e5..543e719 100644 --- a/utils/morse.py +++ b/utils/morse.py @@ -95,11 +95,21 @@ class MorseDecoder: self._char_gap = 3.0 * dit_sec / self._block_duration # blocks self._word_gap = 7.0 * dit_sec / self._block_duration # blocks + # AGC (automatic gain control) for direct sampling / weak signals + self._agc_target = 0.3 # target RMS amplitude (0-1 range) + self._agc_gain = 1.0 # current AGC multiplier + self._agc_alpha = 0.05 # EMA smoothing for gain changes + + # Warm-up phase constants + self._WARMUP_BLOCKS = 50 # ~1 second at 50 blocks/sec + self._SETTLE_BLOCKS = 200 # blocks for fast→slow EMA transition + self._mag_min = float('inf') + self._mag_max = 0.0 + # Adaptive threshold via EMA self._noise_floor = 0.0 self._signal_peak = 0.0 self._threshold = 0.0 - self._ema_alpha = 0.1 # smoothing factor # State machine (counts in blocks, not wall-clock time) self._tone_on = False @@ -136,20 +146,47 @@ class MorseDecoder: # Normalize to [-1, 1] normalized = [s / 32768.0 for s in block] + + # AGC: boost quiet signals (e.g. direct sampling mode) + rms = math.sqrt(sum(s * s for s in normalized) / len(normalized)) + if rms > 1e-6: + desired_gain = self._agc_target / rms + self._agc_gain += self._agc_alpha * (desired_gain - self._agc_gain) + self._agc_gain = min(self._agc_gain, 500.0) # cap to prevent runaway + normalized = [s * self._agc_gain for s in normalized] + mag = self._filter.magnitude(normalized) amplitudes.append(mag) self._blocks_processed += 1 - # Update adaptive threshold - if mag < self._threshold or self._threshold == 0: - self._noise_floor += self._ema_alpha * (mag - self._noise_floor) + # Warm-up phase: collect statistics, suppress detection + if self._blocks_processed <= self._WARMUP_BLOCKS: + self._mag_min = min(self._mag_min, mag) + self._mag_max = max(self._mag_max, mag) + if self._blocks_processed == self._WARMUP_BLOCKS: + # Seed thresholds from observed range + self._noise_floor = self._mag_min + self._signal_peak = max(self._mag_max, self._mag_min * 2) + self._threshold = self._noise_floor + 0.3 * ( + self._signal_peak - self._noise_floor + ) + tone_detected = False else: - self._signal_peak += self._ema_alpha * (mag - self._signal_peak) + # Adaptive EMA: fast initially, slow in steady state + alpha = 0.3 if self._blocks_processed < self._WARMUP_BLOCKS + self._SETTLE_BLOCKS else 0.05 - self._threshold = (self._noise_floor + self._signal_peak) / 2.0 + if mag < self._threshold: + self._noise_floor += alpha * (mag - self._noise_floor) + else: + self._signal_peak += alpha * (mag - self._signal_peak) - tone_detected = mag > self._threshold and self._threshold > 0 + # Threshold at 30% between noise and signal (sensitive to weak CW) + self._threshold = self._noise_floor + 0.3 * ( + self._signal_peak - self._noise_floor + ) + + tone_detected = mag > self._threshold and self._threshold > 0 if tone_detected and not self._tone_on: # Tone just started - check silence duration for gaps