Files
intercept/tests/test_sstv_decoder.py
2026-02-19 12:18:20 +00:00

948 lines
34 KiB
Python

"""Tests for the pure-Python SSTV decoder.
Covers VIS detection, Goertzel accuracy, mode specs, synthetic image
decoding, and integration with the SSTVDecoder orchestrator.
"""
from __future__ import annotations
import math
import tempfile
import wave
from pathlib import Path
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from utils.sstv.constants import (
FREQ_BLACK,
FREQ_LEADER,
FREQ_PIXEL_HIGH,
FREQ_PIXEL_LOW,
FREQ_SYNC,
FREQ_VIS_BIT_0,
FREQ_VIS_BIT_1,
FREQ_WHITE,
SAMPLE_RATE,
)
from utils.sstv.dsp import (
estimate_frequency,
freq_to_pixel,
goertzel,
goertzel_batch,
goertzel_mag,
normalize_audio,
samples_for_duration,
)
from utils.sstv.modes import (
ALL_MODES,
MARTIN_1,
PD_120,
PD_180,
ROBOT_36,
ROBOT_72,
SCOTTIE_1,
ColorModel,
SyncPosition,
get_mode,
get_mode_by_name,
)
from utils.sstv.sstv_decoder import (
DecodeProgress,
DopplerInfo,
SSTVDecoder,
SSTVImage,
get_sstv_decoder,
is_sstv_available,
)
from utils.sstv.vis import VISDetector, VISState
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def generate_tone(freq: float, duration_s: float,
sample_rate: int = SAMPLE_RATE,
amplitude: float = 0.8) -> np.ndarray:
"""Generate a pure sine tone."""
t = np.arange(int(duration_s * sample_rate)) / sample_rate
return amplitude * np.sin(2 * np.pi * freq * t)
def generate_vis_header(vis_code: int, sample_rate: int = SAMPLE_RATE) -> np.ndarray:
"""Generate a synthetic VIS header for a given code.
Structure: leader1 (300ms) + break (10ms) + leader2 (300ms)
+ start_bit (30ms) + 8 data bits (30ms each)
+ parity bit (30ms) + stop_bit (30ms)
"""
parts = []
# Leader 1 (1900 Hz, 300ms)
parts.append(generate_tone(FREQ_LEADER, 0.300, sample_rate))
# Break (1200 Hz, 10ms)
parts.append(generate_tone(FREQ_SYNC, 0.010, sample_rate))
# Leader 2 (1900 Hz, 300ms)
parts.append(generate_tone(FREQ_LEADER, 0.300, sample_rate))
# Start bit (1200 Hz, 30ms)
parts.append(generate_tone(FREQ_SYNC, 0.030, sample_rate))
# 8 data bits (LSB first)
ones_count = 0
for i in range(8):
bit = (vis_code >> i) & 1
if bit:
ones_count += 1
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030, sample_rate))
else:
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030, sample_rate))
# Even parity bit
parity = ones_count % 2
if parity:
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030, sample_rate))
else:
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030, sample_rate))
# Stop bit (1200 Hz, 30ms)
parts.append(generate_tone(FREQ_SYNC, 0.030, sample_rate))
return np.concatenate(parts)
# ---------------------------------------------------------------------------
# Goertzel / DSP tests
# ---------------------------------------------------------------------------
class TestGoertzel:
"""Tests for the Goertzel algorithm."""
def test_detects_exact_frequency(self):
"""Goertzel should have peak energy at the generated frequency."""
tone = generate_tone(1200.0, 0.01)
energy_1200 = goertzel(tone, 1200.0)
energy_1500 = goertzel(tone, 1500.0)
energy_1900 = goertzel(tone, 1900.0)
assert energy_1200 > energy_1500 * 5
assert energy_1200 > energy_1900 * 5
def test_different_frequencies(self):
"""Each candidate frequency should produce peak at its own freq."""
for freq in [1100, 1200, 1300, 1500, 1900, 2300]:
tone = generate_tone(float(freq), 0.01)
energy = goertzel(tone, float(freq))
# Should have significant energy at the target
assert energy > 0
def test_empty_samples(self):
"""Goertzel on empty array should return 0."""
assert goertzel(np.array([], dtype=np.float64), 1200.0) == 0.0
def test_goertzel_mag(self):
"""goertzel_mag should return sqrt of energy."""
tone = generate_tone(1200.0, 0.01)
energy = goertzel(tone, 1200.0)
mag = goertzel_mag(tone, 1200.0)
assert abs(mag - math.sqrt(energy)) < 1e-10
class TestEstimateFrequency:
"""Tests for frequency estimation."""
def test_estimates_known_frequency(self):
"""Should accurately estimate a known tone frequency."""
tone = generate_tone(1900.0, 0.02)
estimated = estimate_frequency(tone, 1000.0, 2500.0)
assert abs(estimated - 1900.0) <= 30.0
def test_estimates_black_level(self):
"""Should detect the black level frequency."""
tone = generate_tone(FREQ_BLACK, 0.02)
estimated = estimate_frequency(tone, 1400.0, 1600.0)
assert abs(estimated - FREQ_BLACK) <= 30.0
def test_estimates_white_level(self):
"""Should detect the white level frequency."""
tone = generate_tone(FREQ_WHITE, 0.02)
estimated = estimate_frequency(tone, 2200.0, 2400.0)
assert abs(estimated - FREQ_WHITE) <= 30.0
def test_empty_samples(self):
"""Should return 0 for empty input."""
assert estimate_frequency(np.array([], dtype=np.float64)) == 0.0
class TestFreqToPixel:
"""Tests for frequency-to-pixel mapping."""
def test_black_level(self):
"""1500 Hz should map to 0 (black)."""
assert freq_to_pixel(FREQ_PIXEL_LOW) == 0
def test_white_level(self):
"""2300 Hz should map to 255 (white)."""
assert freq_to_pixel(FREQ_PIXEL_HIGH) == 255
def test_midpoint(self):
"""Middle frequency should map to approximately 128."""
mid_freq = (FREQ_PIXEL_LOW + FREQ_PIXEL_HIGH) / 2
pixel = freq_to_pixel(mid_freq)
assert 120 <= pixel <= 135
def test_below_black_clamps(self):
"""Frequencies below black level should clamp to 0."""
assert freq_to_pixel(1000.0) == 0
def test_above_white_clamps(self):
"""Frequencies above white level should clamp to 255."""
assert freq_to_pixel(3000.0) == 255
class TestNormalizeAudio:
"""Tests for int16 to float64 normalization."""
def test_max_positive(self):
"""int16 max should normalize to ~1.0."""
raw = np.array([32767], dtype=np.int16)
result = normalize_audio(raw)
assert abs(result[0] - (32767.0 / 32768.0)) < 1e-10
def test_zero(self):
"""int16 zero should normalize to 0.0."""
raw = np.array([0], dtype=np.int16)
result = normalize_audio(raw)
assert result[0] == 0.0
def test_negative(self):
"""int16 min should normalize to -1.0."""
raw = np.array([-32768], dtype=np.int16)
result = normalize_audio(raw)
assert result[0] == -1.0
class TestSamplesForDuration:
"""Tests for duration-to-samples calculation."""
def test_one_second(self):
"""1 second at 48kHz should be 48000 samples."""
assert samples_for_duration(1.0) == 48000
def test_five_ms(self):
"""5ms at 48kHz should be 240 samples."""
assert samples_for_duration(0.005) == 240
def test_custom_rate(self):
"""Should work with custom sample rates."""
assert samples_for_duration(1.0, 22050) == 22050
class TestGoertzelBatch:
"""Tests for the vectorized batch Goertzel function."""
def test_matches_scalar_goertzel(self):
"""Batch result should match individual goertzel calls."""
rng = np.random.default_rng(42)
# 10 pixel windows of 20 samples each
audio_matrix = rng.standard_normal((10, 20))
freqs = np.array([1200.0, 1500.0, 1900.0, 2300.0])
batch_result = goertzel_batch(audio_matrix, freqs)
assert batch_result.shape == (10, 4)
for i in range(10):
for j, f in enumerate(freqs):
scalar = goertzel(audio_matrix[i], f)
assert abs(batch_result[i, j] - scalar) < 1e-6, \
f"Mismatch at pixel {i}, freq {f}"
def test_detects_correct_frequency(self):
"""Batch should find peak at the correct frequency for each pixel.
Uses 96-sample windows (2ms at 48kHz) matching the decoder's
minimum analysis window, with 5Hz resolution.
"""
freqs = np.arange(1400.0, 2405.0, 5.0) # 5Hz step, same as decoder
window_size = 96 # Matches _MIN_ANALYSIS_WINDOW
pixels = []
for target in [1500.0, 1900.0, 2300.0]:
t = np.arange(window_size) / SAMPLE_RATE
pixels.append(0.8 * np.sin(2 * np.pi * target * t))
audio_matrix = np.array(pixels)
energies = goertzel_batch(audio_matrix, freqs)
best_idx = np.argmax(energies, axis=1)
best_freqs = freqs[best_idx]
# With 96 samples, frequency accuracy is within ~25 Hz
assert abs(best_freqs[0] - 1500.0) <= 30.0
assert abs(best_freqs[1] - 1900.0) <= 30.0
assert abs(best_freqs[2] - 2300.0) <= 30.0
def test_empty_input(self):
"""Should handle empty inputs gracefully."""
result = goertzel_batch(np.zeros((0, 10)), np.array([1200.0]))
assert result.shape == (0, 1)
result = goertzel_batch(np.zeros((5, 10)), np.array([]))
assert result.shape == (5, 0)
# ---------------------------------------------------------------------------
# VIS detection tests
# ---------------------------------------------------------------------------
class TestVISDetector:
"""Tests for VIS header detection."""
def test_initial_state(self):
"""Detector should start in IDLE state."""
detector = VISDetector()
assert detector.state == VISState.IDLE
def test_reset(self):
"""Reset should return to IDLE state."""
detector = VISDetector()
# Feed some leader tone to change state
detector.feed(generate_tone(FREQ_LEADER, 0.250))
detector.reset()
assert detector.state == VISState.IDLE
def test_detect_robot36(self):
"""Should detect Robot36 VIS code (8)."""
detector = VISDetector()
header = generate_vis_header(8) # Robot36
# Add some silence before and after
audio = np.concatenate([
np.zeros(2400),
header,
np.zeros(2400),
])
result = detector.feed(audio)
assert result is not None
vis_code, mode_name = result
assert vis_code == 8
assert mode_name == 'Robot36'
def test_detect_martin1(self):
"""Should detect Martin1 VIS code (44)."""
detector = VISDetector()
header = generate_vis_header(44) # Martin1
audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
result = detector.feed(audio)
assert result is not None
vis_code, mode_name = result
assert vis_code == 44
assert mode_name == 'Martin1'
def test_detect_scottie1(self):
"""Should detect Scottie1 VIS code (60)."""
detector = VISDetector()
header = generate_vis_header(60) # Scottie1
audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
result = detector.feed(audio)
assert result is not None
vis_code, mode_name = result
assert vis_code == 60
assert mode_name == 'Scottie1'
def test_detect_pd120(self):
"""Should detect PD120 VIS code (95)."""
detector = VISDetector()
header = generate_vis_header(95) # PD120
audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
result = detector.feed(audio)
assert result is not None
vis_code, mode_name = result
assert vis_code == 95
assert mode_name == 'PD120'
def test_noise_rejection(self):
"""Should not falsely detect VIS in noise."""
detector = VISDetector()
rng = np.random.default_rng(42)
noise = rng.standard_normal(48000) * 0.1 # 1 second of noise
result = detector.feed(noise)
assert result is None
def test_incremental_feeding(self):
"""Should work with small chunks fed incrementally."""
detector = VISDetector()
header = generate_vis_header(8)
audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
# Feed in small chunks (100 samples each)
chunk_size = 100
result = None
offset = 0
while offset < len(audio):
chunk = audio[offset:offset + chunk_size]
offset += chunk_size
result = detector.feed(chunk)
if result is not None:
break
assert result is not None
vis_code, mode_name = result
assert vis_code == 8
assert mode_name == 'Robot36'
def test_noisy_leader_detection(self):
"""Should detect VIS despite intermittent None windows in leader.
Simulates HF fading by inserting short silence gaps (which produce
ambiguous tone classification) into the leader tone.
"""
detector = VISDetector()
parts = []
# Build leader1 with gaps: 50ms tone, 10ms silence, repeated
# Total ~300ms of leader with interruptions
for _ in range(6):
parts.append(generate_tone(FREQ_LEADER, 0.050))
parts.append(np.zeros(int(SAMPLE_RATE * 0.010))) # 10ms gap
# Break (1200 Hz, 10ms)
parts.append(generate_tone(FREQ_SYNC, 0.010))
# Leader 2 (clean)
parts.append(generate_tone(FREQ_LEADER, 0.300))
# Start bit + data bits + parity + stop (standard for Robot36 = VIS 8)
parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit
vis_code = 8
ones_count = 0
for i in range(8):
bit = (vis_code >> i) & 1
if bit:
ones_count += 1
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030))
else:
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030))
parity = ones_count % 2
parts.append(generate_tone(
FREQ_VIS_BIT_1 if parity else FREQ_VIS_BIT_0, 0.030))
parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit
audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)])
result = detector.feed(audio)
assert result is not None
assert result[0] == 8
assert result[1] == 'Robot36'
def test_vis_error_correction_parity_bit(self):
"""Should recover when only the parity bit is corrupted."""
detector = VISDetector()
# Generate Martin1 header (VIS 44) but flip the parity bit
parts = []
parts.append(generate_tone(FREQ_LEADER, 0.300))
parts.append(generate_tone(FREQ_SYNC, 0.010))
parts.append(generate_tone(FREQ_LEADER, 0.300))
parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit
vis_code = 44 # Martin1
ones_count = 0
for i in range(8):
bit = (vis_code >> i) & 1
if bit:
ones_count += 1
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030))
else:
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030))
# Wrong parity (flip it)
correct_parity = ones_count % 2
wrong_parity = 1 - correct_parity
parts.append(generate_tone(
FREQ_VIS_BIT_1 if wrong_parity else FREQ_VIS_BIT_0, 0.030))
parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit
audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)])
result = detector.feed(audio)
assert result is not None
assert result[0] == 44
assert result[1] == 'Martin1'
def test_vis_error_correction_data_bit(self):
"""Should recover Martin1 when one data bit is flipped by HF noise.
Simulates: Martin1 (VIS 44) transmitted correctly, but bit 0 is
corrupted during reception. The parity bit is received correctly
(computed for the original code 44), so parity check fails → error
correction tries flipping each data bit and finds VIS 44.
"""
detector = VISDetector()
original_code = 44 # Martin1
corrupted_code = 44 ^ 1 # flip bit 0 → 45
parts = []
parts.append(generate_tone(FREQ_LEADER, 0.300))
parts.append(generate_tone(FREQ_SYNC, 0.010))
parts.append(generate_tone(FREQ_LEADER, 0.300))
parts.append(generate_tone(FREQ_SYNC, 0.030)) # start bit
# Transmit corrupted data bits
for i in range(8):
bit = (corrupted_code >> i) & 1
if bit:
parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030))
else:
parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030))
# Parity bit computed for the ORIGINAL code (received correctly)
original_ones = bin(original_code).count('1')
parity = original_ones % 2
parts.append(generate_tone(
FREQ_VIS_BIT_1 if parity else FREQ_VIS_BIT_0, 0.030))
parts.append(generate_tone(FREQ_SYNC, 0.030)) # stop bit
audio = np.concatenate([np.zeros(2400)] + parts + [np.zeros(2400)])
result = detector.feed(audio)
assert result is not None
assert result[0] == 44
assert result[1] == 'Martin1'
# ---------------------------------------------------------------------------
# Mode spec tests
# ---------------------------------------------------------------------------
class TestModes:
"""Tests for SSTV mode specifications."""
def test_all_vis_codes_have_modes(self):
"""All defined VIS codes should have matching mode specs."""
for vis_code in [8, 12, 44, 40, 60, 56, 95, 97, 99, 98, 96, 76]:
mode = get_mode(vis_code)
assert mode is not None, f"No mode for VIS code {vis_code}"
def test_robot36_spec(self):
"""Robot36 should have correct dimensions and timing."""
assert ROBOT_36.width == 320
assert ROBOT_36.height == 240
assert ROBOT_36.vis_code == 8
assert ROBOT_36.color_model == ColorModel.YCRCB
assert ROBOT_36.has_half_rate_chroma is True
assert ROBOT_36.sync_position == SyncPosition.FRONT
def test_martin1_spec(self):
"""Martin1 should have correct dimensions."""
assert MARTIN_1.width == 320
assert MARTIN_1.height == 256
assert MARTIN_1.vis_code == 44
assert MARTIN_1.color_model == ColorModel.RGB
assert len(MARTIN_1.channels) == 3
def test_scottie1_spec(self):
"""Scottie1 should have middle sync position."""
assert SCOTTIE_1.sync_position == SyncPosition.MIDDLE
assert SCOTTIE_1.width == 320
assert SCOTTIE_1.height == 256
def test_pd120_spec(self):
"""PD120 should have dual-luminance YCrCb."""
assert PD_120.width == 640
assert PD_120.height == 496
assert PD_120.color_model == ColorModel.YCRCB_DUAL
assert len(PD_120.channels) == 4 # Y1, Cr, Cb, Y2
def test_get_mode_unknown(self):
"""Unknown VIS code should return None."""
assert get_mode(999) is None
def test_get_mode_by_name(self):
"""Should look up modes by name."""
mode = get_mode_by_name('Robot36')
assert mode is not None
assert mode.vis_code == 8
def test_mode_by_name_unknown(self):
"""Unknown mode name should return None."""
assert get_mode_by_name('FakeMode') is None
def test_robot72_spec(self):
"""Robot72 should have 3 channels and full-rate chroma."""
assert ROBOT_72.width == 320
assert ROBOT_72.height == 240
assert ROBOT_72.vis_code == 12
assert ROBOT_72.color_model == ColorModel.YCRCB
assert ROBOT_72.has_half_rate_chroma is False
assert len(ROBOT_72.channels) == 3 # Y, Cr, Cb
assert ROBOT_72.channel_separator_ms == 6.0
def test_robot36_separator(self):
"""Robot36 should have a 6ms separator between Y and chroma."""
assert ROBOT_36.channel_separator_ms == 6.0
assert ROBOT_36.has_half_rate_chroma is True
assert len(ROBOT_36.channels) == 2 # Y, alternating Cr/Cb
def test_pd120_channel_timings(self):
"""PD120 channel durations should sum to line_duration minus sync+porch."""
channel_sum = sum(ch.duration_ms for ch in PD_120.channels)
expected = PD_120.line_duration_ms - PD_120.sync_duration_ms - PD_120.sync_porch_ms
assert abs(channel_sum - expected) < 0.1, \
f"PD120 channels sum to {channel_sum}ms, expected {expected}ms"
def test_pd180_channel_timings(self):
"""PD180 channel durations should sum to line_duration minus sync+porch."""
channel_sum = sum(ch.duration_ms for ch in PD_180.channels)
expected = PD_180.line_duration_ms - PD_180.sync_duration_ms - PD_180.sync_porch_ms
assert abs(channel_sum - expected) < 0.1, \
f"PD180 channels sum to {channel_sum}ms, expected {expected}ms"
def test_robot36_timing_consistency(self):
"""Robot36 total channel + sync + porch + separator should equal line_duration."""
total = (ROBOT_36.sync_duration_ms + ROBOT_36.sync_porch_ms
+ sum(ch.duration_ms for ch in ROBOT_36.channels)
+ ROBOT_36.channel_separator_ms) # 1 separator for 2 channels
assert abs(total - ROBOT_36.line_duration_ms) < 0.1
def test_robot72_timing_consistency(self):
"""Robot72 total should equal line_duration."""
# 3 channels with 2 separators
total = (ROBOT_72.sync_duration_ms + ROBOT_72.sync_porch_ms
+ sum(ch.duration_ms for ch in ROBOT_72.channels)
+ ROBOT_72.channel_separator_ms * 2)
assert abs(total - ROBOT_72.line_duration_ms) < 0.1
def test_all_modes_have_positive_dimensions(self):
"""All modes should have positive width and height."""
for _vis_code, mode in ALL_MODES.items():
assert mode.width > 0, f"{mode.name} has invalid width"
assert mode.height > 0, f"{mode.name} has invalid height"
assert mode.line_duration_ms > 0, f"{mode.name} has invalid line duration"
# ---------------------------------------------------------------------------
# Image decoder tests
# ---------------------------------------------------------------------------
class TestImageDecoder:
"""Tests for the SSTV image decoder."""
def test_creates_decoder(self):
"""Should create an image decoder for any supported mode."""
from utils.sstv.image_decoder import SSTVImageDecoder
decoder = SSTVImageDecoder(ROBOT_36)
assert decoder.is_complete is False
assert decoder.current_line == 0
assert decoder.total_lines == 240
def test_pd120_dual_luminance_lines(self):
"""PD120 decoder should expect half the image height in audio lines."""
from utils.sstv.image_decoder import SSTVImageDecoder
decoder = SSTVImageDecoder(PD_120)
assert decoder.total_lines == 248 # 496 / 2
def test_progress_percent(self):
"""Progress should start at 0."""
from utils.sstv.image_decoder import SSTVImageDecoder
decoder = SSTVImageDecoder(ROBOT_36)
assert decoder.progress_percent == 0
def test_synthetic_robot36_decode(self):
"""Should decode a synthetic Robot36 image (all white)."""
pytest.importorskip('PIL')
from utils.sstv.image_decoder import SSTVImageDecoder
decoder = SSTVImageDecoder(ROBOT_36)
# Generate synthetic scanlines (all white = 2300 Hz)
# Each line: sync(9ms) + porch(3ms) + Y(88ms) + separator(6ms) + Cr/Cb(44ms)
for _line in range(240):
parts = []
# Sync pulse
parts.append(generate_tone(FREQ_SYNC, 0.009))
# Porch
parts.append(generate_tone(FREQ_BLACK, 0.003))
# Y channel (white = 2300 Hz)
parts.append(generate_tone(FREQ_WHITE, 0.088))
# Separator + porch (6ms)
parts.append(generate_tone(FREQ_BLACK, 0.006))
# Chroma channel (mid value = 1900 Hz ~ 128)
parts.append(generate_tone(1900.0, 0.044))
# Pad to line duration
line_audio = np.concatenate(parts)
line_samples = samples_for_duration(ROBOT_36.line_duration_ms / 1000.0)
if len(line_audio) < line_samples:
line_audio = np.concatenate([
line_audio,
np.zeros(line_samples - len(line_audio))
])
decoder.feed(line_audio)
assert decoder.is_complete
img = decoder.get_image()
assert img is not None
assert img.size == (320, 240)
def test_slant_correction_wraps_rows_without_blank_wedge(self):
"""Slant correction should rotate rows, not introduce black fill."""
PIL = pytest.importorskip('PIL')
from utils.sstv.image_decoder import SSTVImageDecoder
decoder = SSTVImageDecoder(SCOTTIE_1)
decoder._sync_deviations = [float(i * 4) for i in range(SCOTTIE_1.height)]
source = np.full((SCOTTIE_1.height, SCOTTIE_1.width, 3), 128, dtype=np.uint8)
img = PIL.Image.fromarray(source, 'RGB')
corrected = decoder._apply_slant_correction(img)
corrected_arr = np.array(corrected)
# If correction clips/fills, zeros appear. Circular shift should preserve all values.
assert corrected_arr.min() == 128
assert corrected_arr.max() == 128
def test_slant_correction_skips_implausible_drift(self):
"""Very large estimated drift should be treated as a bad fit and ignored."""
PIL = pytest.importorskip('PIL')
from utils.sstv.image_decoder import SSTVImageDecoder
decoder = SSTVImageDecoder(SCOTTIE_1)
decoder._sync_deviations = [float(i * 40) for i in range(SCOTTIE_1.height)]
source = np.full((SCOTTIE_1.height, SCOTTIE_1.width, 3), 177, dtype=np.uint8)
img = PIL.Image.fromarray(source, 'RGB')
corrected = decoder._apply_slant_correction(img)
# Implausible slope should return original image unchanged.
assert np.array_equal(np.array(corrected), source)
# ---------------------------------------------------------------------------
# SSTVDecoder orchestrator tests
# ---------------------------------------------------------------------------
class TestSSTVDecoder:
"""Tests for the SSTVDecoder orchestrator."""
def test_decoder_available(self):
"""Python decoder should always be available."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
assert decoder.decoder_available == 'python-sstv'
def test_is_sstv_available(self):
"""is_sstv_available() should always return True."""
assert is_sstv_available() is True
def test_not_running_initially(self):
"""Decoder should not be running on creation."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
assert decoder.is_running is False
def test_doppler_disabled_by_default(self):
"""Doppler should be disabled by default."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
assert decoder.doppler_enabled is False
assert decoder.last_doppler_info is None
def test_stop_when_not_running(self):
"""Stop should be safe to call when not running."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
decoder.stop() # Should not raise
def test_set_callback(self):
"""Should accept a callback function."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
cb = MagicMock()
decoder.set_callback(cb)
# Trigger a progress emit
decoder._emit_progress(DecodeProgress(status='detecting'))
cb.assert_called_once()
def test_get_images_empty(self):
"""Should return empty list initially."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
images = decoder.get_images()
assert images == []
def test_decode_file_not_found(self):
"""Should raise FileNotFoundError for missing file."""
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
with pytest.raises(FileNotFoundError):
decoder.decode_file('/nonexistent/audio.wav')
def test_decode_file_with_synthetic_wav(self):
"""Should process a WAV file through the decode pipeline."""
pytest.importorskip('PIL')
output_dir = tempfile.mkdtemp()
decoder = SSTVDecoder(output_dir=output_dir)
# Generate a synthetic WAV with a VIS header + short image data
vis_header = generate_vis_header(8) # Robot36
# Add 240 lines of image data after the header
image_lines = []
for _line in range(240):
parts = []
parts.append(generate_tone(FREQ_SYNC, 0.009))
parts.append(generate_tone(FREQ_BLACK, 0.003))
parts.append(generate_tone(1900.0, 0.088)) # mid-gray Y
parts.append(generate_tone(FREQ_BLACK, 0.006)) # separator
parts.append(generate_tone(1900.0, 0.044)) # chroma
line_audio = np.concatenate(parts)
line_samples = samples_for_duration(ROBOT_36.line_duration_ms / 1000.0)
if len(line_audio) < line_samples:
line_audio = np.concatenate([
line_audio,
np.zeros(line_samples - len(line_audio))
])
image_lines.append(line_audio)
audio = np.concatenate([
np.zeros(4800), # 100ms silence
vis_header,
*image_lines,
np.zeros(4800),
])
# Write WAV file
wav_path = Path(output_dir) / 'test_input.wav'
raw_int16 = (audio * 32767).astype(np.int16)
with wave.open(str(wav_path), 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(raw_int16.tobytes())
images = decoder.decode_file(wav_path)
assert len(images) >= 1
assert images[0].mode == 'Robot36'
assert Path(images[0].path).exists()
# ---------------------------------------------------------------------------
# Dataclass tests
# ---------------------------------------------------------------------------
class TestDataclasses:
"""Tests for dataclass serialization."""
def test_decode_progress_to_dict(self):
"""DecodeProgress should serialize correctly."""
progress = DecodeProgress(
status='decoding',
mode='Robot36',
progress_percent=50,
message='Halfway done',
)
d = progress.to_dict()
assert d['type'] == 'sstv_progress'
assert d['status'] == 'decoding'
assert d['mode'] == 'Robot36'
assert d['progress'] == 50
assert d['message'] == 'Halfway done'
def test_decode_progress_minimal(self):
"""DecodeProgress with only status should omit optional fields."""
progress = DecodeProgress(status='detecting')
d = progress.to_dict()
assert 'mode' not in d
assert 'message' not in d
assert 'image' not in d
def test_sstv_image_to_dict(self):
"""SSTVImage should serialize with URL."""
from datetime import datetime, timezone
image = SSTVImage(
filename='test.png',
path=Path('/tmp/test.png'),
mode='Robot36',
timestamp=datetime(2024, 1, 1, tzinfo=timezone.utc),
frequency=145.800,
size_bytes=1234,
)
d = image.to_dict()
assert d['filename'] == 'test.png'
assert d['mode'] == 'Robot36'
assert d['url'] == '/sstv/images/test.png'
def test_doppler_info_to_dict(self):
"""DopplerInfo should serialize with rounding."""
from datetime import datetime, timezone
info = DopplerInfo(
frequency_hz=145800123.456,
shift_hz=123.456,
range_rate_km_s=-1.23456,
elevation=45.678,
azimuth=180.123,
timestamp=datetime(2024, 1, 1, tzinfo=timezone.utc),
)
d = info.to_dict()
assert d['shift_hz'] == 123.5
assert d['range_rate_km_s'] == -1.235
assert d['elevation'] == 45.7
# ---------------------------------------------------------------------------
# Integration tests
# ---------------------------------------------------------------------------
class TestIntegration:
"""Integration tests verifying the package works as a drop-in replacement."""
def test_import_from_utils_sstv(self):
"""Routes should be able to import from utils.sstv."""
from utils.sstv import (
ISS_SSTV_FREQ,
is_sstv_available,
)
assert ISS_SSTV_FREQ == 145.800
assert is_sstv_available() is True
def test_sstv_modes_constant(self):
"""SSTV_MODES list should be importable."""
from utils.sstv import SSTV_MODES
assert 'Robot36' in SSTV_MODES
assert 'Martin1' in SSTV_MODES
assert 'PD120' in SSTV_MODES
def test_decoder_singleton(self):
"""get_sstv_decoder should return a valid decoder."""
# Reset the global singleton for test isolation
import utils.sstv.sstv_decoder as mod
old = mod._decoder
mod._decoder = None
try:
decoder = get_sstv_decoder()
assert decoder is not None
assert decoder.decoder_available == 'python-sstv'
finally:
mod._decoder = old
@patch('subprocess.Popen')
def test_start_creates_subprocess(self, mock_popen):
"""start() should create an rtl_fm subprocess."""
mock_process = MagicMock()
mock_process.stdout = MagicMock()
mock_process.stdout.read = MagicMock(return_value=b'')
mock_process.stderr = MagicMock()
mock_popen.return_value = mock_process
decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
success = decoder.start(frequency=145.800, device_index=0)
assert success is True
assert decoder.is_running is True
# Verify rtl_fm was called
mock_popen.assert_called_once()
cmd = mock_popen.call_args[0][0]
assert cmd[0] == 'rtl_fm'
assert '-f' in cmd
assert '-M' in cmd
decoder.stop()
assert decoder.is_running is False