mirror of
https://github.com/smittix/intercept.git
synced 2026-05-29 19:49:28 -07:00
fix: Improve HF SSTV VIS detection reliability and error correction
Tolerate intermittent ambiguous windows during leader detection (up to 3 consecutive misses), use energy-based break detection when tone classification fails at leader-break boundary, and add single-bit VIS error correction for parity-bit and data-bit corruption on noisy HF. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -395,6 +395,121 @@ class TestVISDetector:
|
||||
assert vis_code == 8
|
||||
assert mode_name == 'Robot36'
|
||||
|
||||
def test_noisy_leader_detection(self):
|
||||
"""Should detect VIS despite intermittent None windows in leader.
|
||||
|
||||
Simulates HF fading by inserting short silence gaps (which produce
|
||||
ambiguous tone classification) into the leader tone.
|
||||
"""
|
||||
detector = VISDetector()
|
||||
parts = []
|
||||
|
||||
# Build leader1 with gaps: 50ms tone, 10ms silence, repeated
|
||||
# Total ~300ms of leader with interruptions
|
||||
for _ in range(6):
|
||||
parts.append(generate_tone(FREQ_LEADER, 0.050))
|
||||
parts.append(np.zeros(int(SAMPLE_RATE * 0.010))) # 10ms gap
|
||||
|
||||
# Break (1200 Hz, 10ms)
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.010))
|
||||
|
||||
# Leader 2 (clean)
|
||||
parts.append(generate_tone(FREQ_LEADER, 0.300))
|
||||
|
||||
# Start bit + data bits + parity + stop (standard for Robot36 = VIS 8)
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit
|
||||
vis_code = 8
|
||||
ones_count = 0
|
||||
for i in range(8):
|
||||
bit = (vis_code >> i) & 1
|
||||
if bit:
|
||||
ones_count += 1
|
||||
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030))
|
||||
else:
|
||||
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030))
|
||||
parity = ones_count % 2
|
||||
parts.append(generate_tone(
|
||||
FREQ_VIS_BIT_1 if parity else FREQ_VIS_BIT_0, 0.030))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit
|
||||
|
||||
audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)])
|
||||
result = detector.feed(audio)
|
||||
assert result is not None
|
||||
assert result[0] == 8
|
||||
assert result[1] == 'Robot36'
|
||||
|
||||
def test_vis_error_correction_parity_bit(self):
|
||||
"""Should recover when only the parity bit is corrupted."""
|
||||
detector = VISDetector()
|
||||
# Generate Martin1 header (VIS 44) but flip the parity bit
|
||||
parts = []
|
||||
parts.append(generate_tone(FREQ_LEADER, 0.300))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.010))
|
||||
parts.append(generate_tone(FREQ_LEADER, 0.300))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit
|
||||
|
||||
vis_code = 44 # Martin1
|
||||
ones_count = 0
|
||||
for i in range(8):
|
||||
bit = (vis_code >> i) & 1
|
||||
if bit:
|
||||
ones_count += 1
|
||||
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030))
|
||||
else:
|
||||
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030))
|
||||
|
||||
# Wrong parity (flip it)
|
||||
correct_parity = ones_count % 2
|
||||
wrong_parity = 1 - correct_parity
|
||||
parts.append(generate_tone(
|
||||
FREQ_VIS_BIT_1 if wrong_parity else FREQ_VIS_BIT_0, 0.030))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit
|
||||
|
||||
audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)])
|
||||
result = detector.feed(audio)
|
||||
assert result is not None
|
||||
assert result[0] == 44
|
||||
assert result[1] == 'Martin1'
|
||||
|
||||
def test_vis_error_correction_data_bit(self):
|
||||
"""Should recover Martin1 when one data bit is flipped by HF noise.
|
||||
|
||||
Simulates: Martin1 (VIS 44) transmitted correctly, but bit 0 is
|
||||
corrupted during reception. The parity bit is received correctly
|
||||
(computed for the original code 44), so parity check fails → error
|
||||
correction tries flipping each data bit and finds VIS 44.
|
||||
"""
|
||||
detector = VISDetector()
|
||||
original_code = 44 # Martin1
|
||||
corrupted_code = 44 ^ 1 # flip bit 0 → 45
|
||||
|
||||
parts = []
|
||||
parts.append(generate_tone(FREQ_LEADER, 0.300))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.010))
|
||||
parts.append(generate_tone(FREQ_LEADER, 0.300))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit
|
||||
|
||||
# Transmit corrupted data bits
|
||||
for i in range(8):
|
||||
bit = (corrupted_code >> i) & 1
|
||||
if bit:
|
||||
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030))
|
||||
else:
|
||||
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030))
|
||||
|
||||
# Parity bit computed for the ORIGINAL code (received correctly)
|
||||
original_ones = bin(original_code).count('1')
|
||||
parity = original_ones % 2
|
||||
parts.append(generate_tone(
|
||||
FREQ_VIS_BIT_1 if parity else FREQ_VIS_BIT_0, 0.030))
|
||||
parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit
|
||||
|
||||
audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)])
|
||||
result = detector.feed(audio)
|
||||
assert result is not None
|
||||
assert result[0] == 44
|
||||
assert result[1] == 'Martin1'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mode spec tests
|
||||
@@ -405,7 +520,7 @@ class TestModes:
|
||||
|
||||
def test_all_vis_codes_have_modes(self):
|
||||
"""All defined VIS codes should have matching mode specs."""
|
||||
for vis_code in [8, 12, 44, 40, 60, 56, 93, 95]:
|
||||
for vis_code in [8, 12, 44, 40, 60, 56, 93, 95, 96, 98, 113, 55]:
|
||||
mode = get_mode(vis_code)
|
||||
assert mode is not None, f"No mode for VIS code {vis_code}"
|
||||
|
||||
|
||||
@@ -128,6 +128,7 @@ class VISDetector:
|
||||
self._state = VISState.IDLE
|
||||
self._buffer = np.array([], dtype=np.float64)
|
||||
self._tone_counter = 0
|
||||
self._miss_counter = 0
|
||||
self._data_bits: list[int] = []
|
||||
self._parity_bit: int = 0
|
||||
self._bit_accumulator: list[np.ndarray] = []
|
||||
@@ -137,6 +138,7 @@ class VISDetector:
|
||||
self._state = VISState.IDLE
|
||||
self._buffer = np.array([], dtype=np.float64)
|
||||
self._tone_counter = 0
|
||||
self._miss_counter = 0
|
||||
self._data_bits = []
|
||||
self._parity_bit = 0
|
||||
self._bit_accumulator = []
|
||||
@@ -188,10 +190,19 @@ class VISDetector:
|
||||
if self._state == VISState.IDLE:
|
||||
if tone == FREQ_LEADER:
|
||||
self._tone_counter += 1
|
||||
self._miss_counter = 0
|
||||
if self._tone_counter >= self._leader_min_windows:
|
||||
self._state = VISState.LEADER_1
|
||||
elif tone is None:
|
||||
# Ambiguous window (noise/fading) — tolerate up to 3
|
||||
# consecutive misses before resetting the leader count.
|
||||
self._miss_counter += 1
|
||||
if self._miss_counter > 3:
|
||||
self._tone_counter = 0
|
||||
self._miss_counter = 0
|
||||
else:
|
||||
self._tone_counter = 0
|
||||
self._miss_counter = 0
|
||||
|
||||
elif self._state == VISState.LEADER_1:
|
||||
if tone == FREQ_LEADER:
|
||||
@@ -204,7 +215,15 @@ class VISDetector:
|
||||
self._tone_counter = 1
|
||||
self._state = VISState.BREAK
|
||||
elif tone is None:
|
||||
pass # Ambiguous window at tone boundary — stay in state
|
||||
# Mixed leader+break window? Check if 1200 Hz energy is
|
||||
# significant relative to 1900 Hz — indicates the break
|
||||
# pulse is straddling this analysis window.
|
||||
leader_e = goertzel(window, FREQ_LEADER, self._sample_rate)
|
||||
sync_e = goertzel(window, FREQ_SYNC, self._sample_rate)
|
||||
if sync_e > leader_e * 0.5:
|
||||
self._tone_counter = 1
|
||||
self._state = VISState.BREAK
|
||||
# else: noisy leader window, stay in LEADER_1
|
||||
else:
|
||||
self._tone_counter = 0
|
||||
self._state = VISState.IDLE
|
||||
@@ -338,24 +357,42 @@ class VISDetector:
|
||||
def _validate_and_decode(self) -> tuple[int, str] | None:
|
||||
"""Validate parity and decode the VIS code.
|
||||
|
||||
Includes single-bit error correction for HF noise resilience:
|
||||
if parity fails, tries recovering by assuming either the parity
|
||||
bit or exactly one data bit was corrupted.
|
||||
|
||||
Returns:
|
||||
(vis_code, mode_name) or None if validation fails.
|
||||
"""
|
||||
if len(self._data_bits) != 8:
|
||||
return None
|
||||
|
||||
# VIS uses even parity across 8 data bits + parity bit.
|
||||
if (sum(self._data_bits) + self._parity_bit) % 2 != 0:
|
||||
return None
|
||||
parity_ok = (sum(self._data_bits) + self._parity_bit) % 2 == 0
|
||||
vis_code = sum(bit << i for i, bit in enumerate(self._data_bits))
|
||||
|
||||
# Decode VIS code (LSB first)
|
||||
vis_code = 0
|
||||
for i, bit in enumerate(self._data_bits):
|
||||
vis_code |= bit << i
|
||||
if parity_ok:
|
||||
mode_name = VIS_CODES.get(vis_code)
|
||||
if mode_name is not None:
|
||||
return vis_code, mode_name
|
||||
return None # Valid parity but unknown code — not SSTV
|
||||
|
||||
# Look up mode
|
||||
# Parity failed — try error correction
|
||||
|
||||
# Case 1: only the parity bit is wrong (data is correct)
|
||||
mode_name = VIS_CODES.get(vis_code)
|
||||
if mode_name is not None:
|
||||
return vis_code, mode_name
|
||||
|
||||
# Case 2: one data bit is wrong — try flipping each
|
||||
for flip in range(8):
|
||||
corrected = vis_code ^ (1 << flip)
|
||||
# Flipping one data bit should fix parity too
|
||||
corrected_parity_ok = (
|
||||
bin(corrected).count('1') + self._parity_bit
|
||||
) % 2 == 0
|
||||
if corrected_parity_ok:
|
||||
mode_name = VIS_CODES.get(corrected)
|
||||
if mode_name is not None:
|
||||
return corrected, mode_name
|
||||
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user