mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Fix Morse decoder silent on real HF signals via AGC and warm-up
Add automatic gain control (AGC) before Goertzel processing to normalize quiet audio from direct sampling mode where the -g gain flag has no effect. Fix broken adaptive threshold bootstrap by adding a 50-block warm-up phase that collects magnitude statistics before seeding noise floor and signal peak. Lower threshold ratio from 50% to 30% for better weak-CW sensitivity. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -147,18 +147,17 @@ class TestGoertzelFilter:
|
||||
|
||||
class TestMorseDecoder:
|
||||
def _make_decoder(self, wpm=15):
|
||||
"""Create decoder with pre-warmed threshold for testing."""
|
||||
"""Create decoder with warm-up phase completed for testing.
|
||||
|
||||
Feeds silence then tone then silence to get past the warm-up
|
||||
blocks and establish a valid noise floor / signal peak.
|
||||
"""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=wpm)
|
||||
# Warm up noise floor with silence
|
||||
silence = generate_silence(0.5)
|
||||
decoder.process_block(silence)
|
||||
# Warm up signal peak with tone
|
||||
tone = generate_tone(700.0, 0.3)
|
||||
decoder.process_block(tone)
|
||||
# More silence to settle
|
||||
silence2 = generate_silence(0.5)
|
||||
decoder.process_block(silence2)
|
||||
# Reset state after warm-up
|
||||
# Feed enough audio to get past warm-up (50 blocks = 1 sec)
|
||||
# Mix silence and tone so warm-up sees both noise and signal
|
||||
warmup_audio = generate_silence(0.6) + generate_tone(700.0, 0.4) + generate_silence(0.5)
|
||||
decoder.process_block(warmup_audio)
|
||||
# Reset state machine after warm-up so tests start clean
|
||||
decoder._tone_on = False
|
||||
decoder._current_symbol = ''
|
||||
decoder._tone_blocks = 0
|
||||
@@ -246,14 +245,14 @@ class TestMorseDecoder:
|
||||
assert 'tone_on' in se
|
||||
|
||||
def test_adaptive_threshold_adjusts(self):
|
||||
"""After processing audio, threshold should be non-zero."""
|
||||
"""After processing enough audio to complete warm-up, threshold should be non-zero."""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
|
||||
# Process some tone + silence
|
||||
audio = generate_tone(700.0, 0.3) + generate_silence(0.3)
|
||||
# Feed enough audio to complete the 50-block warm-up (~1 second)
|
||||
audio = generate_silence(0.6) + generate_tone(700.0, 0.4) + generate_silence(0.3)
|
||||
decoder.process_block(audio)
|
||||
|
||||
assert decoder._threshold > 0, "Threshold should adapt above zero"
|
||||
assert decoder._threshold > 0, "Threshold should adapt above zero after warm-up"
|
||||
|
||||
def test_flush_emits_pending_char(self):
|
||||
"""flush() should emit any accumulated but not-yet-decoded symbol."""
|
||||
@@ -269,6 +268,39 @@ class TestMorseDecoder:
|
||||
events = decoder.flush()
|
||||
assert events == []
|
||||
|
||||
def test_weak_signal_detection(self):
|
||||
"""CW tone at only 3x noise magnitude should still decode characters."""
|
||||
decoder = self._make_decoder(wpm=10)
|
||||
# Generate weak CW audio (low amplitude simulating weak HF signal)
|
||||
audio = generate_morse_audio('SOS', wpm=10, sample_rate=8000)
|
||||
# Scale to low amplitude (simulating weak signal)
|
||||
n_samples = len(audio) // 2
|
||||
samples = struct.unpack(f'<{n_samples}h', audio)
|
||||
# Reduce to ~10% amplitude
|
||||
weak_samples = [max(-32768, min(32767, int(s * 0.1))) for s in samples]
|
||||
weak_audio = struct.pack(f'<{len(weak_samples)}h', *weak_samples)
|
||||
|
||||
events = decoder.process_block(weak_audio)
|
||||
events.extend(decoder.flush())
|
||||
|
||||
chars = [e for e in events if e['type'] == 'morse_char']
|
||||
decoded = ''.join(e['char'] for e in chars)
|
||||
# Should decode at least some characters from the weak signal
|
||||
assert len(chars) >= 1, f"Expected decoded chars from weak signal, got '{decoded}'"
|
||||
|
||||
def test_agc_boosts_quiet_signal(self):
|
||||
"""Very quiet PCM (amplitude 0.01) should still produce usable Goertzel magnitudes."""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
# Generate very quiet tone
|
||||
quiet_tone = generate_tone(700.0, 1.5, amplitude=0.01) # 1.5s of very quiet CW
|
||||
events = decoder.process_block(quiet_tone)
|
||||
|
||||
scope_events = [e for e in events if e['type'] == 'scope']
|
||||
assert len(scope_events) > 0, "Expected scope events from quiet signal"
|
||||
# AGC should have boosted the signal — amplitudes should be visible
|
||||
max_amp = max(max(se['amplitudes']) for se in scope_events)
|
||||
assert max_amp > 1.0, f"AGC should boost quiet signal to usable magnitude, got {max_amp}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# morse_decoder_thread tests
|
||||
|
||||
@@ -95,11 +95,21 @@ class MorseDecoder:
|
||||
self._char_gap = 3.0 * dit_sec / self._block_duration # blocks
|
||||
self._word_gap = 7.0 * dit_sec / self._block_duration # blocks
|
||||
|
||||
# AGC (automatic gain control) for direct sampling / weak signals
|
||||
self._agc_target = 0.3 # target RMS amplitude (0-1 range)
|
||||
self._agc_gain = 1.0 # current AGC multiplier
|
||||
self._agc_alpha = 0.05 # EMA smoothing for gain changes
|
||||
|
||||
# Warm-up phase constants
|
||||
self._WARMUP_BLOCKS = 50 # ~1 second at 50 blocks/sec
|
||||
self._SETTLE_BLOCKS = 200 # blocks for fast→slow EMA transition
|
||||
self._mag_min = float('inf')
|
||||
self._mag_max = 0.0
|
||||
|
||||
# Adaptive threshold via EMA
|
||||
self._noise_floor = 0.0
|
||||
self._signal_peak = 0.0
|
||||
self._threshold = 0.0
|
||||
self._ema_alpha = 0.1 # smoothing factor
|
||||
|
||||
# State machine (counts in blocks, not wall-clock time)
|
||||
self._tone_on = False
|
||||
@@ -136,20 +146,47 @@ class MorseDecoder:
|
||||
|
||||
# Normalize to [-1, 1]
|
||||
normalized = [s / 32768.0 for s in block]
|
||||
|
||||
# AGC: boost quiet signals (e.g. direct sampling mode)
|
||||
rms = math.sqrt(sum(s * s for s in normalized) / len(normalized))
|
||||
if rms > 1e-6:
|
||||
desired_gain = self._agc_target / rms
|
||||
self._agc_gain += self._agc_alpha * (desired_gain - self._agc_gain)
|
||||
self._agc_gain = min(self._agc_gain, 500.0) # cap to prevent runaway
|
||||
normalized = [s * self._agc_gain for s in normalized]
|
||||
|
||||
mag = self._filter.magnitude(normalized)
|
||||
amplitudes.append(mag)
|
||||
|
||||
self._blocks_processed += 1
|
||||
|
||||
# Update adaptive threshold
|
||||
if mag < self._threshold or self._threshold == 0:
|
||||
self._noise_floor += self._ema_alpha * (mag - self._noise_floor)
|
||||
# Warm-up phase: collect statistics, suppress detection
|
||||
if self._blocks_processed <= self._WARMUP_BLOCKS:
|
||||
self._mag_min = min(self._mag_min, mag)
|
||||
self._mag_max = max(self._mag_max, mag)
|
||||
if self._blocks_processed == self._WARMUP_BLOCKS:
|
||||
# Seed thresholds from observed range
|
||||
self._noise_floor = self._mag_min
|
||||
self._signal_peak = max(self._mag_max, self._mag_min * 2)
|
||||
self._threshold = self._noise_floor + 0.3 * (
|
||||
self._signal_peak - self._noise_floor
|
||||
)
|
||||
tone_detected = False
|
||||
else:
|
||||
self._signal_peak += self._ema_alpha * (mag - self._signal_peak)
|
||||
# Adaptive EMA: fast initially, slow in steady state
|
||||
alpha = 0.3 if self._blocks_processed < self._WARMUP_BLOCKS + self._SETTLE_BLOCKS else 0.05
|
||||
|
||||
self._threshold = (self._noise_floor + self._signal_peak) / 2.0
|
||||
if mag < self._threshold:
|
||||
self._noise_floor += alpha * (mag - self._noise_floor)
|
||||
else:
|
||||
self._signal_peak += alpha * (mag - self._signal_peak)
|
||||
|
||||
tone_detected = mag > self._threshold and self._threshold > 0
|
||||
# Threshold at 30% between noise and signal (sensitive to weak CW)
|
||||
self._threshold = self._noise_floor + 0.3 * (
|
||||
self._signal_peak - self._noise_floor
|
||||
)
|
||||
|
||||
tone_detected = mag > self._threshold and self._threshold > 0
|
||||
|
||||
if tone_detected and not self._tone_on:
|
||||
# Tone just started - check silence duration for gaps
|
||||
|
||||
Reference in New Issue
Block a user