diff --git a/tests/test_sstv_decoder.py b/tests/test_sstv_decoder.py index ead653d..be9aa29 100644 --- a/tests/test_sstv_decoder.py +++ b/tests/test_sstv_decoder.py @@ -395,6 +395,121 @@ class TestVISDetector: assert vis_code == 8 assert mode_name == 'Robot36' + def test_noisy_leader_detection(self): + """Should detect VIS despite intermittent None windows in leader. + + Simulates HF fading by inserting short silence gaps (which produce + ambiguous tone classification) into the leader tone. + """ + detector = VISDetector() + parts = [] + + # Build leader1 with gaps: 50ms tone, 10ms silence, repeated + # Total ~300ms of leader with interruptions + for _ in range(6): + parts.append(generate_tone(FREQ_LEADER, 0.050)) + parts.append(np.zeros(int(SAMPLE_RATE * 0.010))) # 10ms gap + + # Break (1200 Hz, 10ms) + parts.append(generate_tone(FREQ_SYNC, 0.010)) + + # Leader 2 (clean) + parts.append(generate_tone(FREQ_LEADER, 0.300)) + + # Start bit + data bits + parity + stop (standard for Robot36 = VIS 8) + parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit + vis_code = 8 + ones_count = 0 + for i in range(8): + bit = (vis_code >> i) & 1 + if bit: + ones_count += 1 + parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030)) + else: + parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030)) + parity = ones_count % 2 + parts.append(generate_tone( + FREQ_VIS_BIT_1 if parity else FREQ_VIS_BIT_0, 0.030)) + parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit + + audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)]) + result = detector.feed(audio) + assert result is not None + assert result[0] == 8 + assert result[1] == 'Robot36' + + def test_vis_error_correction_parity_bit(self): + """Should recover when only the parity bit is corrupted.""" + detector = VISDetector() + # Generate Martin1 header (VIS 44) but flip the parity bit + parts = [] + parts.append(generate_tone(FREQ_LEADER, 0.300)) + parts.append(generate_tone(FREQ_SYNC, 0.010)) + parts.append(generate_tone(FREQ_LEADER, 0.300)) + parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit + + vis_code = 44 # Martin1 + ones_count = 0 + for i in range(8): + bit = (vis_code >> i) & 1 + if bit: + ones_count += 1 + parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030)) + else: + parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030)) + + # Wrong parity (flip it) + correct_parity = ones_count % 2 + wrong_parity = 1 - correct_parity + parts.append(generate_tone( + FREQ_VIS_BIT_1 if wrong_parity else FREQ_VIS_BIT_0, 0.030)) + parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit + + audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)]) + result = detector.feed(audio) + assert result is not None + assert result[0] == 44 + assert result[1] == 'Martin1' + + def test_vis_error_correction_data_bit(self): + """Should recover Martin1 when one data bit is flipped by HF noise. + + Simulates: Martin1 (VIS 44) transmitted correctly, but bit 0 is + corrupted during reception. The parity bit is received correctly + (computed for the original code 44), so parity check fails → error + correction tries flipping each data bit and finds VIS 44. + """ + detector = VISDetector() + original_code = 44 # Martin1 + corrupted_code = 44 ^ 1 # flip bit 0 → 45 + + parts = [] + parts.append(generate_tone(FREQ_LEADER, 0.300)) + parts.append(generate_tone(FREQ_SYNC, 0.010)) + parts.append(generate_tone(FREQ_LEADER, 0.300)) + parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit + + # Transmit corrupted data bits + for i in range(8): + bit = (corrupted_code >> i) & 1 + if bit: + parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030)) + else: + parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030)) + + # Parity bit computed for the ORIGINAL code (received correctly) + original_ones = bin(original_code).count('1') + parity = original_ones % 2 + parts.append(generate_tone( + FREQ_VIS_BIT_1 if parity else FREQ_VIS_BIT_0, 0.030)) + parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit + + audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)]) + result = detector.feed(audio) + assert result is not None + assert result[0] == 44 + assert result[1] == 'Martin1' + # --------------------------------------------------------------------------- # Mode spec tests @@ -405,7 +520,7 @@ class TestModes: def test_all_vis_codes_have_modes(self): """All defined VIS codes should have matching mode specs.""" - for vis_code in [8, 12, 44, 40, 60, 56, 93, 95]: + for vis_code in [8, 12, 44, 40, 60, 56, 93, 95, 96, 98, 113, 55]: mode = get_mode(vis_code) assert mode is not None, f"No mode for VIS code {vis_code}" diff --git a/utils/sstv/vis.py b/utils/sstv/vis.py index 50a83c2..9694cfd 100644 --- a/utils/sstv/vis.py +++ b/utils/sstv/vis.py @@ -128,6 +128,7 @@ class VISDetector: self._state = VISState.IDLE self._buffer = np.array([], dtype=np.float64) self._tone_counter = 0 + self._miss_counter = 0 self._data_bits: list[int] = [] self._parity_bit: int = 0 self._bit_accumulator: list[np.ndarray] = [] @@ -137,6 +138,7 @@ class VISDetector: self._state = VISState.IDLE self._buffer = np.array([], dtype=np.float64) self._tone_counter = 0 + self._miss_counter = 0 self._data_bits = [] self._parity_bit = 0 self._bit_accumulator = [] @@ -188,10 +190,19 @@ class VISDetector: if self._state == VISState.IDLE: if tone == FREQ_LEADER: self._tone_counter += 1 + self._miss_counter = 0 if self._tone_counter >= self._leader_min_windows: self._state = VISState.LEADER_1 + elif tone is None: + # Ambiguous window (noise/fading) — tolerate up to 3 + # consecutive misses before resetting the leader count. + self._miss_counter += 1 + if self._miss_counter > 3: + self._tone_counter = 0 + self._miss_counter = 0 else: self._tone_counter = 0 + self._miss_counter = 0 elif self._state == VISState.LEADER_1: if tone == FREQ_LEADER: @@ -204,7 +215,15 @@ class VISDetector: self._tone_counter = 1 self._state = VISState.BREAK elif tone is None: - pass # Ambiguous window at tone boundary — stay in state + # Mixed leader+break window? Check if 1200 Hz energy is + # significant relative to 1900 Hz — indicates the break + # pulse is straddling this analysis window. + leader_e = goertzel(window, FREQ_LEADER, self._sample_rate) + sync_e = goertzel(window, FREQ_SYNC, self._sample_rate) + if sync_e > leader_e * 0.5: + self._tone_counter = 1 + self._state = VISState.BREAK + # else: noisy leader window, stay in LEADER_1 else: self._tone_counter = 0 self._state = VISState.IDLE @@ -338,24 +357,42 @@ class VISDetector: def _validate_and_decode(self) -> tuple[int, str] | None: """Validate parity and decode the VIS code. + Includes single-bit error correction for HF noise resilience: + if parity fails, tries recovering by assuming either the parity + bit or exactly one data bit was corrupted. + Returns: (vis_code, mode_name) or None if validation fails. """ if len(self._data_bits) != 8: return None - # VIS uses even parity across 8 data bits + parity bit. - if (sum(self._data_bits) + self._parity_bit) % 2 != 0: - return None + parity_ok = (sum(self._data_bits) + self._parity_bit) % 2 == 0 + vis_code = sum(bit << i for i, bit in enumerate(self._data_bits)) - # Decode VIS code (LSB first) - vis_code = 0 - for i, bit in enumerate(self._data_bits): - vis_code |= bit << i + if parity_ok: + mode_name = VIS_CODES.get(vis_code) + if mode_name is not None: + return vis_code, mode_name + return None # Valid parity but unknown code — not SSTV - # Look up mode + # Parity failed — try error correction + + # Case 1: only the parity bit is wrong (data is correct) mode_name = VIS_CODES.get(vis_code) if mode_name is not None: return vis_code, mode_name + # Case 2: one data bit is wrong — try flipping each + for flip in range(8): + corrected = vis_code ^ (1 << flip) + # Flipping one data bit should fix parity too + corrected_parity_ok = ( + bin(corrected).count('1') + self._parity_bit + ) % 2 == 0 + if corrected_parity_ok: + mode_name = VIS_CODES.get(corrected) + if mode_name is not None: + return corrected, mode_name + return None