diff --git a/pyproject.toml b/pyproject.toml
index c00f283..1040a65 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -57,6 +57,7 @@ optionals = [
"scipy>=1.10.0",
"qrcode[pil]>=7.4",
"numpy>=1.24.0",
+ "Pillow>=9.0.0",
"meshtastic>=2.0.0",
"psycopg2-binary>=2.9.9",
"scapy>=2.4.5",
diff --git a/requirements.txt b/requirements.txt
index ae63a67..911afde 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -13,10 +13,13 @@ bleak>=0.21.0
# Satellite tracking (optional - only needed for satellite features)
skyfield>=1.45
-# DSC decoding (optional - only needed for VHF DSC maritime distress)
+# DSC decoding and SSTV decoding (DSP pipeline)
scipy>=1.10.0
numpy>=1.24.0
+# SSTV image output (optional - needed for SSTV image decoding)
+Pillow>=9.0.0
+
# GPS dongle support (optional - only needed for USB GPS receivers)
pyserial>=3.5
diff --git a/routes/sstv.py b/routes/sstv.py
index 1640fb8..74bd47c 100644
--- a/routes/sstv.py
+++ b/routes/sstv.py
@@ -94,7 +94,7 @@ def start_decoder():
if not is_sstv_available():
return jsonify({
'status': 'error',
- 'message': 'SSTV decoder not available. Install slowrx: apt install slowrx'
+ 'message': 'SSTV decoder not available. Install numpy and Pillow: pip install numpy Pillow'
}), 400
decoder = get_sstv_decoder()
diff --git a/routes/sstv_general.py b/routes/sstv_general.py
index 4d8a8d7..359e7d1 100644
--- a/routes/sstv_general.py
+++ b/routes/sstv_general.py
@@ -99,7 +99,7 @@ def start_decoder():
if decoder.decoder_available is None:
return jsonify({
'status': 'error',
- 'message': 'SSTV decoder not available. Install slowrx: apt install slowrx',
+ 'message': 'SSTV decoder not available. Install numpy and Pillow: pip install numpy Pillow',
}), 400
if decoder.is_running:
diff --git a/setup.sh b/setup.sh
index cadde64..7cdf6f1 100755
--- a/setup.sh
+++ b/setup.sh
@@ -204,8 +204,6 @@ check_tools() {
check_required "dump1090" "ADS-B decoder" dump1090
check_required "acarsdec" "ACARS decoder" acarsdec
check_required "AIS-catcher" "AIS vessel decoder" AIS-catcher aiscatcher
- check_optional "slowrx" "SSTV decoder (ISS images)" slowrx
-
echo
info "GPS:"
check_required "gpsd" "GPS daemon" gpsd
@@ -390,42 +388,6 @@ install_rtlamr_from_source() {
fi
}
-install_slowrx_from_source_macos() {
- info "slowrx not available via Homebrew. Building from source..."
-
- # Ensure build dependencies are installed
- brew_install fftw
- brew_install libsndfile
- brew_install gtk+3
- brew_install pkg-config
-
- (
- tmp_dir="$(mktemp -d)"
- trap 'rm -rf "$tmp_dir"' EXIT
-
- info "Cloning slowrx..."
- git clone --depth 1 https://github.com/windytan/slowrx.git "$tmp_dir/slowrx" >/dev/null 2>&1 \
- || { warn "Failed to clone slowrx"; exit 1; }
-
- cd "$tmp_dir/slowrx"
- info "Compiling slowrx..."
- # slowrx uses a plain Makefile, not CMake
- local make_log
- make_log=$(make 2>&1) || {
- warn "make failed for slowrx:"
- echo "$make_log" | tail -20
- exit 1
- }
-
- # Install to /usr/local/bin
- if [[ -w /usr/local/bin ]]; then
- install -m 0755 slowrx /usr/local/bin/slowrx
- else
- sudo install -m 0755 slowrx /usr/local/bin/slowrx
- fi
- ok "slowrx installed successfully from source"
- )
-}
install_multimon_ng_from_source_macos() {
info "multimon-ng not available via Homebrew. Building from source..."
@@ -663,8 +625,8 @@ install_macos_packages() {
progress "Installing direwolf (APRS decoder)"
(brew_install direwolf) || warn "direwolf not available via Homebrew"
- progress "Skipping slowrx (SSTV decoder)"
- warn "slowrx requires ALSA (Linux-only) and cannot build on macOS. Skipping."
+ progress "SSTV decoder"
+ ok "SSTV uses built-in pure Python decoder (no external tools needed)"
progress "Installing DSD (Digital Speech Decoder, optional)"
if ! cmd_exists dsd && ! cmd_exists dsd-fme; then
@@ -882,37 +844,6 @@ install_aiscatcher_from_source_debian() {
)
}
-install_slowrx_from_source_debian() {
- info "slowrx not available via APT. Building from source..."
-
- # slowrx uses a simple Makefile, not CMake
- apt_install build-essential git pkg-config \
- libfftw3-dev libsndfile1-dev libgtk-3-dev libasound2-dev libpulse-dev
-
- # Run in subshell to isolate EXIT trap
- (
- tmp_dir="$(mktemp -d)"
- trap 'rm -rf "$tmp_dir"' EXIT
-
- info "Cloning slowrx..."
- git clone --depth 1 https://github.com/windytan/slowrx.git "$tmp_dir/slowrx" >/dev/null 2>&1 \
- || { warn "Failed to clone slowrx"; exit 1; }
-
- cd "$tmp_dir/slowrx"
-
- info "Compiling slowrx..."
- local make_log
- make_log=$(make 2>&1) || {
- warn "make failed for slowrx:"
- echo "$make_log" | tail -20
- warn "ISS SSTV decoding will not be available."
- exit 1
- }
- $SUDO install -m 0755 slowrx /usr/local/bin/slowrx
- ok "slowrx installed successfully."
- )
-}
-
install_ubertooth_from_source_debian() {
info "Building Ubertooth from source..."
@@ -1104,8 +1035,8 @@ install_debian_packages() {
progress "Installing direwolf (APRS decoder)"
apt_install direwolf || true
- progress "Installing slowrx (SSTV decoder)"
- apt_install slowrx || cmd_exists slowrx || install_slowrx_from_source_debian || warn "slowrx not available. ISS SSTV decoding will not be available."
+ progress "SSTV decoder"
+ ok "SSTV uses built-in pure Python decoder (no external tools needed)"
progress "Installing DSD (Digital Speech Decoder, optional)"
if ! cmd_exists dsd && ! cmd_exists dsd-fme; then
diff --git a/static/js/modes/sstv-general.js b/static/js/modes/sstv-general.js
index aa977a9..020ee18 100644
--- a/static/js/modes/sstv-general.js
+++ b/static/js/modes/sstv-general.js
@@ -52,7 +52,7 @@ const SSTVGeneral = (function() {
if (!data.available) {
updateStatusUI('unavailable', 'Decoder not installed');
- showStatusMessage('SSTV decoder not available. Install slowrx: apt install slowrx', 'warning');
+ showStatusMessage('SSTV decoder not available. Install numpy and Pillow: pip install numpy Pillow', 'warning');
return;
}
diff --git a/static/js/modes/sstv.js b/static/js/modes/sstv.js
index fa28af2..89f60c3 100644
--- a/static/js/modes/sstv.js
+++ b/static/js/modes/sstv.js
@@ -183,11 +183,11 @@ const SSTV = (function() {
Settings.registerMap(issMap);
} else {
// Fallback to dark theme tiles
- L.tileLayer('https://{s}.basemaps.cartocdn.com/dark_all/{z}/{x}/{y}{r}.png', {
- maxZoom: 19,
- className: 'tile-layer-cyan'
- }).addTo(issMap);
- }
+ L.tileLayer('https://{s}.basemaps.cartocdn.com/dark_all/{z}/{x}/{y}{r}.png', {
+ maxZoom: 19,
+ className: 'tile-layer-cyan'
+ }).addTo(issMap);
+ }
// Create ISS icon
const issIcon = L.divIcon({
@@ -491,7 +491,7 @@ const SSTV = (function() {
if (!data.available) {
updateStatusUI('unavailable', 'Decoder not installed');
- showStatusMessage('SSTV decoder not available. Install slowrx: apt install slowrx', 'warning');
+ showStatusMessage('SSTV decoder not available. Install numpy and Pillow: pip install numpy Pillow', 'warning');
return;
}
diff --git a/templates/index.html b/templates/index.html
index 30d1128..ed33f2a 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -14158,7 +14158,7 @@
Real-time ISS tracking map with ground track overlay
Next-pass countdown with elevation and duration predictions
Optional Doppler shift compensation for improved reception
- Requires: slowrx decoder + RTL-SDR
+ Requires: RTL-SDR (no external decoder needed - built-in Python SSTV decoder)
HF SSTV Mode
@@ -14168,7 +14168,7 @@
Supports USB, LSB, and FM demodulation modes
Auto-detects correct modulation when selecting a preset frequency
HF frequencies (below 30 MHz) require an upconverter with RTL-SDR
- Requires: slowrx decoder + RTL-SDR (+ upconverter for HF)
+ Requires: RTL-SDR (+ upconverter for HF, no external decoder needed)
diff --git a/tests/test_sstv_decoder.py b/tests/test_sstv_decoder.py
new file mode 100644
index 0000000..ead653d
--- /dev/null
+++ b/tests/test_sstv_decoder.py
@@ -0,0 +1,798 @@
+"""Tests for the pure-Python SSTV decoder.
+
+Covers VIS detection, Goertzel accuracy, mode specs, synthetic image
+decoding, and integration with the SSTVDecoder orchestrator.
+"""
+
+from __future__ import annotations
+
+import math
+import tempfile
+import wave
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import numpy as np
+import pytest
+
+from utils.sstv.constants import (
+ FREQ_BLACK,
+ FREQ_LEADER,
+ FREQ_PIXEL_HIGH,
+ FREQ_PIXEL_LOW,
+ FREQ_SYNC,
+ FREQ_VIS_BIT_0,
+ FREQ_VIS_BIT_1,
+ FREQ_WHITE,
+ SAMPLE_RATE,
+)
+from utils.sstv.dsp import (
+ estimate_frequency,
+ freq_to_pixel,
+ goertzel,
+ goertzel_batch,
+ goertzel_mag,
+ normalize_audio,
+ samples_for_duration,
+)
+from utils.sstv.modes import (
+ ALL_MODES,
+ MARTIN_1,
+ PD_120,
+ PD_180,
+ ROBOT_36,
+ ROBOT_72,
+ SCOTTIE_1,
+ ColorModel,
+ SyncPosition,
+ get_mode,
+ get_mode_by_name,
+)
+from utils.sstv.sstv_decoder import (
+ DecodeProgress,
+ DopplerInfo,
+ SSTVDecoder,
+ SSTVImage,
+ get_sstv_decoder,
+ is_sstv_available,
+)
+from utils.sstv.vis import VISDetector, VISState
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def generate_tone(freq: float, duration_s: float,
+ sample_rate: int = SAMPLE_RATE,
+ amplitude: float = 0.8) -> np.ndarray:
+ """Generate a pure sine tone."""
+ t = np.arange(int(duration_s * sample_rate)) / sample_rate
+ return amplitude * np.sin(2 * np.pi * freq * t)
+
+
+def generate_vis_header(vis_code: int, sample_rate: int = SAMPLE_RATE) -> np.ndarray:
+ """Generate a synthetic VIS header for a given code.
+
+ Structure: leader1 (300ms) + break (10ms) + leader2 (300ms)
+ + start_bit (30ms) + 8 data bits (30ms each)
+ + parity bit (30ms) + stop_bit (30ms)
+ """
+ parts = []
+
+ # Leader 1 (1900 Hz, 300ms)
+ parts.append(generate_tone(FREQ_LEADER, 0.300, sample_rate))
+
+ # Break (1200 Hz, 10ms)
+ parts.append(generate_tone(FREQ_SYNC, 0.010, sample_rate))
+
+ # Leader 2 (1900 Hz, 300ms)
+ parts.append(generate_tone(FREQ_LEADER, 0.300, sample_rate))
+
+ # Start bit (1200 Hz, 30ms)
+ parts.append(generate_tone(FREQ_SYNC, 0.030, sample_rate))
+
+ # 8 data bits (LSB first)
+ ones_count = 0
+ for i in range(8):
+ bit = (vis_code >> i) & 1
+ if bit:
+ ones_count += 1
+ parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030, sample_rate))
+ else:
+ parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030, sample_rate))
+
+ # Even parity bit
+ parity = ones_count % 2
+ if parity:
+ parts.append(generate_tone(FREQ_VIS_BIT_1, 0.030, sample_rate))
+ else:
+ parts.append(generate_tone(FREQ_VIS_BIT_0, 0.030, sample_rate))
+
+ # Stop bit (1200 Hz, 30ms)
+ parts.append(generate_tone(FREQ_SYNC, 0.030, sample_rate))
+
+ return np.concatenate(parts)
+
+
+# ---------------------------------------------------------------------------
+# Goertzel / DSP tests
+# ---------------------------------------------------------------------------
+
+class TestGoertzel:
+ """Tests for the Goertzel algorithm."""
+
+ def test_detects_exact_frequency(self):
+ """Goertzel should have peak energy at the generated frequency."""
+ tone = generate_tone(1200.0, 0.01)
+ energy_1200 = goertzel(tone, 1200.0)
+ energy_1500 = goertzel(tone, 1500.0)
+ energy_1900 = goertzel(tone, 1900.0)
+
+ assert energy_1200 > energy_1500 * 5
+ assert energy_1200 > energy_1900 * 5
+
+ def test_different_frequencies(self):
+ """Each candidate frequency should produce peak at its own freq."""
+ for freq in [1100, 1200, 1300, 1500, 1900, 2300]:
+ tone = generate_tone(float(freq), 0.01)
+ energy = goertzel(tone, float(freq))
+ # Should have significant energy at the target
+ assert energy > 0
+
+ def test_empty_samples(self):
+ """Goertzel on empty array should return 0."""
+ assert goertzel(np.array([], dtype=np.float64), 1200.0) == 0.0
+
+ def test_goertzel_mag(self):
+ """goertzel_mag should return sqrt of energy."""
+ tone = generate_tone(1200.0, 0.01)
+ energy = goertzel(tone, 1200.0)
+ mag = goertzel_mag(tone, 1200.0)
+ assert abs(mag - math.sqrt(energy)) < 1e-10
+
+
+class TestEstimateFrequency:
+ """Tests for frequency estimation."""
+
+ def test_estimates_known_frequency(self):
+ """Should accurately estimate a known tone frequency."""
+ tone = generate_tone(1900.0, 0.02)
+ estimated = estimate_frequency(tone, 1000.0, 2500.0)
+ assert abs(estimated - 1900.0) <= 30.0
+
+ def test_estimates_black_level(self):
+ """Should detect the black level frequency."""
+ tone = generate_tone(FREQ_BLACK, 0.02)
+ estimated = estimate_frequency(tone, 1400.0, 1600.0)
+ assert abs(estimated - FREQ_BLACK) <= 30.0
+
+ def test_estimates_white_level(self):
+ """Should detect the white level frequency."""
+ tone = generate_tone(FREQ_WHITE, 0.02)
+ estimated = estimate_frequency(tone, 2200.0, 2400.0)
+ assert abs(estimated - FREQ_WHITE) <= 30.0
+
+ def test_empty_samples(self):
+ """Should return 0 for empty input."""
+ assert estimate_frequency(np.array([], dtype=np.float64)) == 0.0
+
+
+class TestFreqToPixel:
+ """Tests for frequency-to-pixel mapping."""
+
+ def test_black_level(self):
+ """1500 Hz should map to 0 (black)."""
+ assert freq_to_pixel(FREQ_PIXEL_LOW) == 0
+
+ def test_white_level(self):
+ """2300 Hz should map to 255 (white)."""
+ assert freq_to_pixel(FREQ_PIXEL_HIGH) == 255
+
+ def test_midpoint(self):
+ """Middle frequency should map to approximately 128."""
+ mid_freq = (FREQ_PIXEL_LOW + FREQ_PIXEL_HIGH) / 2
+ pixel = freq_to_pixel(mid_freq)
+ assert 120 <= pixel <= 135
+
+ def test_below_black_clamps(self):
+ """Frequencies below black level should clamp to 0."""
+ assert freq_to_pixel(1000.0) == 0
+
+ def test_above_white_clamps(self):
+ """Frequencies above white level should clamp to 255."""
+ assert freq_to_pixel(3000.0) == 255
+
+
+class TestNormalizeAudio:
+ """Tests for int16 to float64 normalization."""
+
+ def test_max_positive(self):
+ """int16 max should normalize to ~1.0."""
+ raw = np.array([32767], dtype=np.int16)
+ result = normalize_audio(raw)
+ assert abs(result[0] - (32767.0 / 32768.0)) < 1e-10
+
+ def test_zero(self):
+ """int16 zero should normalize to 0.0."""
+ raw = np.array([0], dtype=np.int16)
+ result = normalize_audio(raw)
+ assert result[0] == 0.0
+
+ def test_negative(self):
+ """int16 min should normalize to -1.0."""
+ raw = np.array([-32768], dtype=np.int16)
+ result = normalize_audio(raw)
+ assert result[0] == -1.0
+
+
+class TestSamplesForDuration:
+ """Tests for duration-to-samples calculation."""
+
+ def test_one_second(self):
+ """1 second at 48kHz should be 48000 samples."""
+ assert samples_for_duration(1.0) == 48000
+
+ def test_five_ms(self):
+ """5ms at 48kHz should be 240 samples."""
+ assert samples_for_duration(0.005) == 240
+
+ def test_custom_rate(self):
+ """Should work with custom sample rates."""
+ assert samples_for_duration(1.0, 22050) == 22050
+
+
+class TestGoertzelBatch:
+ """Tests for the vectorized batch Goertzel function."""
+
+ def test_matches_scalar_goertzel(self):
+ """Batch result should match individual goertzel calls."""
+ rng = np.random.default_rng(42)
+ # 10 pixel windows of 20 samples each
+ audio_matrix = rng.standard_normal((10, 20))
+ freqs = np.array([1200.0, 1500.0, 1900.0, 2300.0])
+
+ batch_result = goertzel_batch(audio_matrix, freqs)
+ assert batch_result.shape == (10, 4)
+
+ for i in range(10):
+ for j, f in enumerate(freqs):
+ scalar = goertzel(audio_matrix[i], f)
+ assert abs(batch_result[i, j] - scalar) < 1e-6, \
+ f"Mismatch at pixel {i}, freq {f}"
+
+ def test_detects_correct_frequency(self):
+ """Batch should find peak at the correct frequency for each pixel.
+
+ Uses 96-sample windows (2ms at 48kHz) matching the decoder's
+ minimum analysis window, with 5Hz resolution.
+ """
+ freqs = np.arange(1400.0, 2405.0, 5.0) # 5Hz step, same as decoder
+ window_size = 96 # Matches _MIN_ANALYSIS_WINDOW
+ pixels = []
+ for target in [1500.0, 1900.0, 2300.0]:
+ t = np.arange(window_size) / SAMPLE_RATE
+ pixels.append(0.8 * np.sin(2 * np.pi * target * t))
+ audio_matrix = np.array(pixels)
+
+ energies = goertzel_batch(audio_matrix, freqs)
+ best_idx = np.argmax(energies, axis=1)
+ best_freqs = freqs[best_idx]
+
+ # With 96 samples, frequency accuracy is within ~25 Hz
+ assert abs(best_freqs[0] - 1500.0) <= 30.0
+ assert abs(best_freqs[1] - 1900.0) <= 30.0
+ assert abs(best_freqs[2] - 2300.0) <= 30.0
+
+ def test_empty_input(self):
+ """Should handle empty inputs gracefully."""
+ result = goertzel_batch(np.zeros((0, 10)), np.array([1200.0]))
+ assert result.shape == (0, 1)
+
+ result = goertzel_batch(np.zeros((5, 10)), np.array([]))
+ assert result.shape == (5, 0)
+
+
+# ---------------------------------------------------------------------------
+# VIS detection tests
+# ---------------------------------------------------------------------------
+
+class TestVISDetector:
+ """Tests for VIS header detection."""
+
+ def test_initial_state(self):
+ """Detector should start in IDLE state."""
+ detector = VISDetector()
+ assert detector.state == VISState.IDLE
+
+ def test_reset(self):
+ """Reset should return to IDLE state."""
+ detector = VISDetector()
+ # Feed some leader tone to change state
+ detector.feed(generate_tone(FREQ_LEADER, 0.250))
+ detector.reset()
+ assert detector.state == VISState.IDLE
+
+ def test_detect_robot36(self):
+ """Should detect Robot36 VIS code (8)."""
+ detector = VISDetector()
+ header = generate_vis_header(8) # Robot36
+ # Add some silence before and after
+ audio = np.concatenate([
+ np.zeros(2400),
+ header,
+ np.zeros(2400),
+ ])
+
+ result = detector.feed(audio)
+ assert result is not None
+ vis_code, mode_name = result
+ assert vis_code == 8
+ assert mode_name == 'Robot36'
+
+ def test_detect_martin1(self):
+ """Should detect Martin1 VIS code (44)."""
+ detector = VISDetector()
+ header = generate_vis_header(44) # Martin1
+ audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
+
+ result = detector.feed(audio)
+ assert result is not None
+ vis_code, mode_name = result
+ assert vis_code == 44
+ assert mode_name == 'Martin1'
+
+ def test_detect_scottie1(self):
+ """Should detect Scottie1 VIS code (60)."""
+ detector = VISDetector()
+ header = generate_vis_header(60) # Scottie1
+ audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
+
+ result = detector.feed(audio)
+ assert result is not None
+ vis_code, mode_name = result
+ assert vis_code == 60
+ assert mode_name == 'Scottie1'
+
+ def test_detect_pd120(self):
+ """Should detect PD120 VIS code (93)."""
+ detector = VISDetector()
+ header = generate_vis_header(93) # PD120
+ audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
+
+ result = detector.feed(audio)
+ assert result is not None
+ vis_code, mode_name = result
+ assert vis_code == 93
+ assert mode_name == 'PD120'
+
+ def test_noise_rejection(self):
+ """Should not falsely detect VIS in noise."""
+ detector = VISDetector()
+ rng = np.random.default_rng(42)
+ noise = rng.standard_normal(48000) * 0.1 # 1 second of noise
+ result = detector.feed(noise)
+ assert result is None
+
+ def test_incremental_feeding(self):
+ """Should work with small chunks fed incrementally."""
+ detector = VISDetector()
+ header = generate_vis_header(8)
+ audio = np.concatenate([np.zeros(2400), header, np.zeros(2400)])
+
+ # Feed in small chunks (100 samples each)
+ chunk_size = 100
+ result = None
+ offset = 0
+ while offset < len(audio):
+ chunk = audio[offset:offset + chunk_size]
+ offset += chunk_size
+ result = detector.feed(chunk)
+ if result is not None:
+ break
+
+ assert result is not None
+ vis_code, mode_name = result
+ assert vis_code == 8
+ assert mode_name == 'Robot36'
+
+
+# ---------------------------------------------------------------------------
+# Mode spec tests
+# ---------------------------------------------------------------------------
+
+class TestModes:
+ """Tests for SSTV mode specifications."""
+
+ def test_all_vis_codes_have_modes(self):
+ """All defined VIS codes should have matching mode specs."""
+ for vis_code in [8, 12, 44, 40, 60, 56, 93, 95]:
+ mode = get_mode(vis_code)
+ assert mode is not None, f"No mode for VIS code {vis_code}"
+
+ def test_robot36_spec(self):
+ """Robot36 should have correct dimensions and timing."""
+ assert ROBOT_36.width == 320
+ assert ROBOT_36.height == 240
+ assert ROBOT_36.vis_code == 8
+ assert ROBOT_36.color_model == ColorModel.YCRCB
+ assert ROBOT_36.has_half_rate_chroma is True
+ assert ROBOT_36.sync_position == SyncPosition.FRONT
+
+ def test_martin1_spec(self):
+ """Martin1 should have correct dimensions."""
+ assert MARTIN_1.width == 320
+ assert MARTIN_1.height == 256
+ assert MARTIN_1.vis_code == 44
+ assert MARTIN_1.color_model == ColorModel.RGB
+ assert len(MARTIN_1.channels) == 3
+
+ def test_scottie1_spec(self):
+ """Scottie1 should have middle sync position."""
+ assert SCOTTIE_1.sync_position == SyncPosition.MIDDLE
+ assert SCOTTIE_1.width == 320
+ assert SCOTTIE_1.height == 256
+
+ def test_pd120_spec(self):
+ """PD120 should have dual-luminance YCrCb."""
+ assert PD_120.width == 640
+ assert PD_120.height == 496
+ assert PD_120.color_model == ColorModel.YCRCB_DUAL
+ assert len(PD_120.channels) == 4 # Y1, Cr, Cb, Y2
+
+ def test_get_mode_unknown(self):
+ """Unknown VIS code should return None."""
+ assert get_mode(999) is None
+
+ def test_get_mode_by_name(self):
+ """Should look up modes by name."""
+ mode = get_mode_by_name('Robot36')
+ assert mode is not None
+ assert mode.vis_code == 8
+
+ def test_mode_by_name_unknown(self):
+ """Unknown mode name should return None."""
+ assert get_mode_by_name('FakeMode') is None
+
+ def test_robot72_spec(self):
+ """Robot72 should have 3 channels and full-rate chroma."""
+ assert ROBOT_72.width == 320
+ assert ROBOT_72.height == 240
+ assert ROBOT_72.vis_code == 12
+ assert ROBOT_72.color_model == ColorModel.YCRCB
+ assert ROBOT_72.has_half_rate_chroma is False
+ assert len(ROBOT_72.channels) == 3 # Y, Cr, Cb
+ assert ROBOT_72.channel_separator_ms == 6.0
+
+ def test_robot36_separator(self):
+ """Robot36 should have a 6ms separator between Y and chroma."""
+ assert ROBOT_36.channel_separator_ms == 6.0
+ assert ROBOT_36.has_half_rate_chroma is True
+ assert len(ROBOT_36.channels) == 2 # Y, alternating Cr/Cb
+
+ def test_pd120_channel_timings(self):
+ """PD120 channel durations should sum to line_duration minus sync+porch."""
+ channel_sum = sum(ch.duration_ms for ch in PD_120.channels)
+ expected = PD_120.line_duration_ms - PD_120.sync_duration_ms - PD_120.sync_porch_ms
+ assert abs(channel_sum - expected) < 0.1, \
+ f"PD120 channels sum to {channel_sum}ms, expected {expected}ms"
+
+ def test_pd180_channel_timings(self):
+ """PD180 channel durations should sum to line_duration minus sync+porch."""
+ channel_sum = sum(ch.duration_ms for ch in PD_180.channels)
+ expected = PD_180.line_duration_ms - PD_180.sync_duration_ms - PD_180.sync_porch_ms
+ assert abs(channel_sum - expected) < 0.1, \
+ f"PD180 channels sum to {channel_sum}ms, expected {expected}ms"
+
+ def test_robot36_timing_consistency(self):
+ """Robot36 total channel + sync + porch + separator should equal line_duration."""
+ total = (ROBOT_36.sync_duration_ms + ROBOT_36.sync_porch_ms
+ + sum(ch.duration_ms for ch in ROBOT_36.channels)
+ + ROBOT_36.channel_separator_ms) # 1 separator for 2 channels
+ assert abs(total - ROBOT_36.line_duration_ms) < 0.1
+
+ def test_robot72_timing_consistency(self):
+ """Robot72 total should equal line_duration."""
+ # 3 channels with 2 separators
+ total = (ROBOT_72.sync_duration_ms + ROBOT_72.sync_porch_ms
+ + sum(ch.duration_ms for ch in ROBOT_72.channels)
+ + ROBOT_72.channel_separator_ms * 2)
+ assert abs(total - ROBOT_72.line_duration_ms) < 0.1
+
+ def test_all_modes_have_positive_dimensions(self):
+ """All modes should have positive width and height."""
+ for _vis_code, mode in ALL_MODES.items():
+ assert mode.width > 0, f"{mode.name} has invalid width"
+ assert mode.height > 0, f"{mode.name} has invalid height"
+ assert mode.line_duration_ms > 0, f"{mode.name} has invalid line duration"
+
+
+# ---------------------------------------------------------------------------
+# Image decoder tests
+# ---------------------------------------------------------------------------
+
+class TestImageDecoder:
+ """Tests for the SSTV image decoder."""
+
+ def test_creates_decoder(self):
+ """Should create an image decoder for any supported mode."""
+ from utils.sstv.image_decoder import SSTVImageDecoder
+ decoder = SSTVImageDecoder(ROBOT_36)
+ assert decoder.is_complete is False
+ assert decoder.current_line == 0
+ assert decoder.total_lines == 240
+
+ def test_pd120_dual_luminance_lines(self):
+ """PD120 decoder should expect half the image height in audio lines."""
+ from utils.sstv.image_decoder import SSTVImageDecoder
+ decoder = SSTVImageDecoder(PD_120)
+ assert decoder.total_lines == 248 # 496 / 2
+
+ def test_progress_percent(self):
+ """Progress should start at 0."""
+ from utils.sstv.image_decoder import SSTVImageDecoder
+ decoder = SSTVImageDecoder(ROBOT_36)
+ assert decoder.progress_percent == 0
+
+ def test_synthetic_robot36_decode(self):
+ """Should decode a synthetic Robot36 image (all white)."""
+ pytest.importorskip('PIL')
+ from utils.sstv.image_decoder import SSTVImageDecoder
+
+ decoder = SSTVImageDecoder(ROBOT_36)
+
+ # Generate synthetic scanlines (all white = 2300 Hz)
+ # Each line: sync(9ms) + porch(3ms) + Y(88ms) + separator(6ms) + Cr/Cb(44ms)
+ for _line in range(240):
+ parts = []
+ # Sync pulse
+ parts.append(generate_tone(FREQ_SYNC, 0.009))
+ # Porch
+ parts.append(generate_tone(FREQ_BLACK, 0.003))
+ # Y channel (white = 2300 Hz)
+ parts.append(generate_tone(FREQ_WHITE, 0.088))
+ # Separator + porch (6ms)
+ parts.append(generate_tone(FREQ_BLACK, 0.006))
+ # Chroma channel (mid value = 1900 Hz ~ 128)
+ parts.append(generate_tone(1900.0, 0.044))
+ # Pad to line duration
+ line_audio = np.concatenate(parts)
+ line_samples = samples_for_duration(ROBOT_36.line_duration_ms / 1000.0)
+ if len(line_audio) < line_samples:
+ line_audio = np.concatenate([
+ line_audio,
+ np.zeros(line_samples - len(line_audio))
+ ])
+
+ decoder.feed(line_audio)
+
+ assert decoder.is_complete
+ img = decoder.get_image()
+ assert img is not None
+ assert img.size == (320, 240)
+
+
+# ---------------------------------------------------------------------------
+# SSTVDecoder orchestrator tests
+# ---------------------------------------------------------------------------
+
+class TestSSTVDecoder:
+ """Tests for the SSTVDecoder orchestrator."""
+
+ def test_decoder_available(self):
+ """Python decoder should always be available."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ assert decoder.decoder_available == 'python-sstv'
+
+ def test_is_sstv_available(self):
+ """is_sstv_available() should always return True."""
+ assert is_sstv_available() is True
+
+ def test_not_running_initially(self):
+ """Decoder should not be running on creation."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ assert decoder.is_running is False
+
+ def test_doppler_disabled_by_default(self):
+ """Doppler should be disabled by default."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ assert decoder.doppler_enabled is False
+ assert decoder.last_doppler_info is None
+
+ def test_stop_when_not_running(self):
+ """Stop should be safe to call when not running."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ decoder.stop() # Should not raise
+
+ def test_set_callback(self):
+ """Should accept a callback function."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ cb = MagicMock()
+ decoder.set_callback(cb)
+ # Trigger a progress emit
+ decoder._emit_progress(DecodeProgress(status='detecting'))
+ cb.assert_called_once()
+
+ def test_get_images_empty(self):
+ """Should return empty list initially."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ images = decoder.get_images()
+ assert images == []
+
+ def test_decode_file_not_found(self):
+ """Should raise FileNotFoundError for missing file."""
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ with pytest.raises(FileNotFoundError):
+ decoder.decode_file('/nonexistent/audio.wav')
+
+ def test_decode_file_with_synthetic_wav(self):
+ """Should process a WAV file through the decode pipeline."""
+ pytest.importorskip('PIL')
+
+ output_dir = tempfile.mkdtemp()
+ decoder = SSTVDecoder(output_dir=output_dir)
+
+ # Generate a synthetic WAV with a VIS header + short image data
+ vis_header = generate_vis_header(8) # Robot36
+
+ # Add 240 lines of image data after the header
+ image_lines = []
+ for _line in range(240):
+ parts = []
+ parts.append(generate_tone(FREQ_SYNC, 0.009))
+ parts.append(generate_tone(FREQ_BLACK, 0.003))
+ parts.append(generate_tone(1900.0, 0.088)) # mid-gray Y
+ parts.append(generate_tone(FREQ_BLACK, 0.006)) # separator
+ parts.append(generate_tone(1900.0, 0.044)) # chroma
+ line_audio = np.concatenate(parts)
+ line_samples = samples_for_duration(ROBOT_36.line_duration_ms / 1000.0)
+ if len(line_audio) < line_samples:
+ line_audio = np.concatenate([
+ line_audio,
+ np.zeros(line_samples - len(line_audio))
+ ])
+ image_lines.append(line_audio)
+
+ audio = np.concatenate([
+ np.zeros(4800), # 100ms silence
+ vis_header,
+ *image_lines,
+ np.zeros(4800),
+ ])
+
+ # Write WAV file
+ wav_path = Path(output_dir) / 'test_input.wav'
+ raw_int16 = (audio * 32767).astype(np.int16)
+ with wave.open(str(wav_path), 'wb') as wf:
+ wf.setnchannels(1)
+ wf.setsampwidth(2)
+ wf.setframerate(SAMPLE_RATE)
+ wf.writeframes(raw_int16.tobytes())
+
+ images = decoder.decode_file(wav_path)
+ assert len(images) >= 1
+ assert images[0].mode == 'Robot36'
+ assert Path(images[0].path).exists()
+
+
+# ---------------------------------------------------------------------------
+# Dataclass tests
+# ---------------------------------------------------------------------------
+
+class TestDataclasses:
+ """Tests for dataclass serialization."""
+
+ def test_decode_progress_to_dict(self):
+ """DecodeProgress should serialize correctly."""
+ progress = DecodeProgress(
+ status='decoding',
+ mode='Robot36',
+ progress_percent=50,
+ message='Halfway done',
+ )
+ d = progress.to_dict()
+ assert d['type'] == 'sstv_progress'
+ assert d['status'] == 'decoding'
+ assert d['mode'] == 'Robot36'
+ assert d['progress'] == 50
+ assert d['message'] == 'Halfway done'
+
+ def test_decode_progress_minimal(self):
+ """DecodeProgress with only status should omit optional fields."""
+ progress = DecodeProgress(status='detecting')
+ d = progress.to_dict()
+ assert 'mode' not in d
+ assert 'message' not in d
+ assert 'image' not in d
+
+ def test_sstv_image_to_dict(self):
+ """SSTVImage should serialize with URL."""
+ from datetime import datetime, timezone
+ image = SSTVImage(
+ filename='test.png',
+ path=Path('/tmp/test.png'),
+ mode='Robot36',
+ timestamp=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ frequency=145.800,
+ size_bytes=1234,
+ )
+ d = image.to_dict()
+ assert d['filename'] == 'test.png'
+ assert d['mode'] == 'Robot36'
+ assert d['url'] == '/sstv/images/test.png'
+
+ def test_doppler_info_to_dict(self):
+ """DopplerInfo should serialize with rounding."""
+ from datetime import datetime, timezone
+ info = DopplerInfo(
+ frequency_hz=145800123.456,
+ shift_hz=123.456,
+ range_rate_km_s=-1.23456,
+ elevation=45.678,
+ azimuth=180.123,
+ timestamp=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ )
+ d = info.to_dict()
+ assert d['shift_hz'] == 123.5
+ assert d['range_rate_km_s'] == -1.235
+ assert d['elevation'] == 45.7
+
+
+# ---------------------------------------------------------------------------
+# Integration tests
+# ---------------------------------------------------------------------------
+
+class TestIntegration:
+ """Integration tests verifying the package works as a drop-in replacement."""
+
+ def test_import_from_utils_sstv(self):
+ """Routes should be able to import from utils.sstv."""
+ from utils.sstv import (
+ ISS_SSTV_FREQ,
+ is_sstv_available,
+ )
+ assert ISS_SSTV_FREQ == 145.800
+ assert is_sstv_available() is True
+
+ def test_sstv_modes_constant(self):
+ """SSTV_MODES list should be importable."""
+ from utils.sstv import SSTV_MODES
+ assert 'Robot36' in SSTV_MODES
+ assert 'Martin1' in SSTV_MODES
+ assert 'PD120' in SSTV_MODES
+
+ def test_decoder_singleton(self):
+ """get_sstv_decoder should return a valid decoder."""
+ # Reset the global singleton for test isolation
+ import utils.sstv.sstv_decoder as mod
+ old = mod._decoder
+ mod._decoder = None
+ try:
+ decoder = get_sstv_decoder()
+ assert decoder is not None
+ assert decoder.decoder_available == 'python-sstv'
+ finally:
+ mod._decoder = old
+
+ @patch('subprocess.Popen')
+ def test_start_creates_subprocess(self, mock_popen):
+ """start() should create an rtl_fm subprocess."""
+ mock_process = MagicMock()
+ mock_process.stdout = MagicMock()
+ mock_process.stdout.read = MagicMock(return_value=b'')
+ mock_process.stderr = MagicMock()
+ mock_popen.return_value = mock_process
+
+ decoder = SSTVDecoder(output_dir=tempfile.mkdtemp())
+ success = decoder.start(frequency=145.800, device_index=0)
+ assert success is True
+ assert decoder.is_running is True
+
+ # Verify rtl_fm was called
+ mock_popen.assert_called_once()
+ cmd = mock_popen.call_args[0][0]
+ assert cmd[0] == 'rtl_fm'
+ assert '-f' in cmd
+ assert '-M' in cmd
+
+ decoder.stop()
+ assert decoder.is_running is False
diff --git a/utils/sstv/__init__.py b/utils/sstv/__init__.py
new file mode 100644
index 0000000..092d9a8
--- /dev/null
+++ b/utils/sstv/__init__.py
@@ -0,0 +1,33 @@
+"""SSTV (Slow-Scan Television) decoder package.
+
+Pure Python SSTV decoder using Goertzel-based DSP for VIS header detection
+and scanline-by-scanline image decoding. Supports Robot36/72, Martin1/2,
+Scottie1/2, and PD120/180 modes.
+
+Replaces the external slowrx dependency with numpy/scipy + Pillow.
+"""
+
+from .constants import ISS_SSTV_FREQ, SSTV_MODES
+from .sstv_decoder import (
+ DecodeProgress,
+ DopplerInfo,
+ DopplerTracker,
+ SSTVDecoder,
+ SSTVImage,
+ get_general_sstv_decoder,
+ get_sstv_decoder,
+ is_sstv_available,
+)
+
+__all__ = [
+ 'DecodeProgress',
+ 'DopplerInfo',
+ 'DopplerTracker',
+ 'ISS_SSTV_FREQ',
+ 'SSTV_MODES',
+ 'SSTVDecoder',
+ 'SSTVImage',
+ 'get_general_sstv_decoder',
+ 'get_sstv_decoder',
+ 'is_sstv_available',
+]
diff --git a/utils/sstv/constants.py b/utils/sstv/constants.py
new file mode 100644
index 0000000..e4d56d9
--- /dev/null
+++ b/utils/sstv/constants.py
@@ -0,0 +1,92 @@
+"""SSTV protocol constants.
+
+VIS (Vertical Interval Signaling) codes, frequency assignments, and timing
+constants for all supported SSTV modes per the SSTV protocol specification.
+"""
+
+from __future__ import annotations
+
+# ---------------------------------------------------------------------------
+# Audio / DSP
+# ---------------------------------------------------------------------------
+SAMPLE_RATE = 48000 # Hz - standard audio sample rate used by rtl_fm
+
+# Window size for Goertzel tone detection (5 ms at 48 kHz = 240 samples)
+GOERTZEL_WINDOW = 240
+
+# Chunk size for reading from rtl_fm (100 ms = 4800 samples)
+STREAM_CHUNK_SAMPLES = 4800
+
+# ---------------------------------------------------------------------------
+# SSTV tone frequencies (Hz)
+# ---------------------------------------------------------------------------
+FREQ_VIS_BIT_1 = 1100 # VIS logic 1
+FREQ_SYNC = 1200 # Horizontal sync pulse
+FREQ_VIS_BIT_0 = 1300 # VIS logic 0
+FREQ_BREAK = 1200 # Break tone in VIS header (same as sync)
+FREQ_LEADER = 1900 # Leader / calibration tone
+FREQ_BLACK = 1500 # Black level
+FREQ_WHITE = 2300 # White level
+
+# Pixel luminance mapping range
+FREQ_PIXEL_LOW = 1500 # 0 luminance
+FREQ_PIXEL_HIGH = 2300 # 255 luminance
+
+# Frequency tolerance for tone detection (Hz)
+FREQ_TOLERANCE = 50
+
+# ---------------------------------------------------------------------------
+# VIS header timing (seconds)
+# ---------------------------------------------------------------------------
+VIS_LEADER_MIN = 0.200 # Minimum leader tone duration
+VIS_LEADER_MAX = 0.500 # Maximum leader tone duration
+VIS_LEADER_NOMINAL = 0.300 # Nominal leader tone duration
+VIS_BREAK_DURATION = 0.010 # Break pulse duration (10 ms)
+VIS_BIT_DURATION = 0.030 # Each VIS data bit (30 ms)
+VIS_START_BIT_DURATION = 0.030 # Start bit (30 ms)
+VIS_STOP_BIT_DURATION = 0.030 # Stop bit (30 ms)
+
+# Timing tolerance for VIS detection
+VIS_TIMING_TOLERANCE = 0.5 # 50% tolerance on durations
+
+# ---------------------------------------------------------------------------
+# VIS code → mode name mapping
+# ---------------------------------------------------------------------------
+VIS_CODES: dict[int, str] = {
+ 8: 'Robot36',
+ 12: 'Robot72',
+ 44: 'Martin1',
+ 40: 'Martin2',
+ 60: 'Scottie1',
+ 56: 'Scottie2',
+ 93: 'PD120',
+ 95: 'PD180',
+ # Less common but recognized
+ 4: 'Robot24',
+ 36: 'Martin3',
+ 52: 'Scottie3',
+ 55: 'ScottieDX',
+ 113: 'PD240',
+ 96: 'PD90',
+ 98: 'PD160',
+}
+
+# Reverse mapping: mode name → VIS code
+MODE_TO_VIS: dict[str, int] = {v: k for k, v in VIS_CODES.items()}
+
+# ---------------------------------------------------------------------------
+# Common SSTV modes list (for UI / status)
+# ---------------------------------------------------------------------------
+SSTV_MODES = [
+ 'PD120', 'PD180', 'Martin1', 'Martin2',
+ 'Scottie1', 'Scottie2', 'Robot36', 'Robot72',
+]
+
+# ISS SSTV frequency
+ISS_SSTV_FREQ = 145.800 # MHz
+
+# Speed of light in m/s
+SPEED_OF_LIGHT = 299_792_458
+
+# Minimum energy ratio for valid tone detection (vs noise floor)
+MIN_ENERGY_RATIO = 5.0
diff --git a/utils/sstv/dsp.py b/utils/sstv/dsp.py
new file mode 100644
index 0000000..3c1782a
--- /dev/null
+++ b/utils/sstv/dsp.py
@@ -0,0 +1,232 @@
+"""DSP utilities for SSTV decoding.
+
+Goertzel algorithm for efficient single-frequency energy detection,
+frequency estimation, and frequency-to-pixel luminance mapping.
+"""
+
+from __future__ import annotations
+
+import math
+
+import numpy as np
+
+from .constants import (
+ FREQ_PIXEL_HIGH,
+ FREQ_PIXEL_LOW,
+ MIN_ENERGY_RATIO,
+ SAMPLE_RATE,
+)
+
+
+def goertzel(samples: np.ndarray, target_freq: float,
+ sample_rate: int = SAMPLE_RATE) -> float:
+ """Compute Goertzel energy at a single target frequency.
+
+ O(N) per frequency - more efficient than FFT when only a few
+ frequencies are needed.
+
+ Args:
+ samples: Audio samples (float64, -1.0 to 1.0).
+ target_freq: Frequency to detect (Hz).
+ sample_rate: Sample rate (Hz).
+
+ Returns:
+ Magnitude squared (energy) at the target frequency.
+ """
+ n = len(samples)
+ if n == 0:
+ return 0.0
+
+ # Generalized Goertzel (DTFT): use exact target frequency rather than
+ # rounding to the nearest DFT bin. This is critical for short windows
+ # (e.g. 13 samples/pixel) where integer-k Goertzel quantizes all SSTV
+ # pixel frequencies into 1-2 bins, making estimation impossible.
+ w = 2.0 * math.pi * target_freq / sample_rate
+ coeff = 2.0 * math.cos(w)
+
+ s0 = 0.0
+ s1 = 0.0
+ s2 = 0.0
+
+ for sample in samples:
+ s0 = sample + coeff * s1 - s2
+ s2 = s1
+ s1 = s0
+
+ return s1 * s1 + s2 * s2 - coeff * s1 * s2
+
+
+def goertzel_mag(samples: np.ndarray, target_freq: float,
+ sample_rate: int = SAMPLE_RATE) -> float:
+ """Compute Goertzel magnitude (square root of energy).
+
+ Args:
+ samples: Audio samples.
+ target_freq: Frequency to detect (Hz).
+ sample_rate: Sample rate (Hz).
+
+ Returns:
+ Magnitude at the target frequency.
+ """
+ return math.sqrt(max(0.0, goertzel(samples, target_freq, sample_rate)))
+
+
+def detect_tone(samples: np.ndarray, candidates: list[float],
+ sample_rate: int = SAMPLE_RATE) -> tuple[float | None, float]:
+ """Detect which candidate frequency has the strongest energy.
+
+ Args:
+ samples: Audio samples.
+ candidates: List of candidate frequencies (Hz).
+ sample_rate: Sample rate (Hz).
+
+ Returns:
+ Tuple of (detected_frequency or None, energy_ratio).
+ Returns None if no tone significantly dominates.
+ """
+ if len(samples) == 0 or not candidates:
+ return None, 0.0
+
+ energies = {f: goertzel(samples, f, sample_rate) for f in candidates}
+ max_freq = max(energies, key=energies.get) # type: ignore[arg-type]
+ max_energy = energies[max_freq]
+
+ if max_energy <= 0:
+ return None, 0.0
+
+ # Calculate ratio of strongest to average of others
+ others = [e for f, e in energies.items() if f != max_freq]
+ avg_others = sum(others) / len(others) if others else 0.0
+
+ ratio = max_energy / avg_others if avg_others > 0 else float('inf')
+
+ if ratio >= MIN_ENERGY_RATIO:
+ return max_freq, ratio
+ return None, ratio
+
+
+def estimate_frequency(samples: np.ndarray, freq_low: float = 1000.0,
+ freq_high: float = 2500.0, step: float = 25.0,
+ sample_rate: int = SAMPLE_RATE) -> float:
+ """Estimate the dominant frequency in a range using Goertzel sweep.
+
+ Sweeps through frequencies in the given range and returns the one
+ with maximum energy. Uses a coarse sweep followed by a fine sweep
+ for accuracy.
+
+ Args:
+ samples: Audio samples.
+ freq_low: Lower bound of frequency range (Hz).
+ freq_high: Upper bound of frequency range (Hz).
+ step: Coarse step size (Hz).
+ sample_rate: Sample rate (Hz).
+
+ Returns:
+ Estimated dominant frequency (Hz).
+ """
+ if len(samples) == 0:
+ return 0.0
+
+ # Coarse sweep
+ best_freq = freq_low
+ best_energy = 0.0
+
+ freq = freq_low
+ while freq <= freq_high:
+ energy = goertzel(samples, freq, sample_rate)
+ if energy > best_energy:
+ best_energy = energy
+ best_freq = freq
+ freq += step
+
+ # Fine sweep around the coarse peak (+/- one step, 5 Hz resolution)
+ fine_low = max(freq_low, best_freq - step)
+ fine_high = min(freq_high, best_freq + step)
+ freq = fine_low
+ while freq <= fine_high:
+ energy = goertzel(samples, freq, sample_rate)
+ if energy > best_energy:
+ best_energy = energy
+ best_freq = freq
+ freq += 5.0
+
+ return best_freq
+
+
+def freq_to_pixel(frequency: float) -> int:
+ """Convert SSTV audio frequency to pixel luminance value (0-255).
+
+ Linear mapping: 1500 Hz = 0 (black), 2300 Hz = 255 (white).
+
+ Args:
+ frequency: Detected frequency (Hz).
+
+ Returns:
+ Pixel value clamped to 0-255.
+ """
+ normalized = (frequency - FREQ_PIXEL_LOW) / (FREQ_PIXEL_HIGH - FREQ_PIXEL_LOW)
+ return max(0, min(255, int(normalized * 255 + 0.5)))
+
+
+def samples_for_duration(duration_s: float,
+ sample_rate: int = SAMPLE_RATE) -> int:
+ """Calculate number of samples for a given duration.
+
+ Args:
+ duration_s: Duration in seconds.
+ sample_rate: Sample rate (Hz).
+
+ Returns:
+ Number of samples.
+ """
+ return int(duration_s * sample_rate + 0.5)
+
+
+def goertzel_batch(audio_matrix: np.ndarray, frequencies: np.ndarray,
+ sample_rate: int = SAMPLE_RATE) -> np.ndarray:
+ """Compute Goertzel energy for multiple audio segments at multiple frequencies.
+
+ Vectorized implementation using numpy broadcasting. Processes all
+ pixel windows and all candidate frequencies simultaneously, giving
+ roughly 50-100x speed-up over the scalar ``goertzel`` called in a
+ Python loop.
+
+ Args:
+ audio_matrix: Shape (M, N) – M audio segments of N samples each.
+ frequencies: 1-D array of F target frequencies in Hz.
+ sample_rate: Sample rate in Hz.
+
+ Returns:
+ Shape (M, F) array of energy values.
+ """
+ if audio_matrix.size == 0 or len(frequencies) == 0:
+ return np.zeros((audio_matrix.shape[0], len(frequencies)))
+
+ _M, N = audio_matrix.shape
+
+ # Generalized Goertzel (DTFT): exact target frequencies, no bin rounding
+ w = 2.0 * np.pi * frequencies / sample_rate
+ coeff = 2.0 * np.cos(w) # (F,)
+
+ s1 = np.zeros((audio_matrix.shape[0], len(frequencies)))
+ s2 = np.zeros_like(s1)
+
+ for n in range(N):
+ samples_n = audio_matrix[:, n:n + 1] # (M, 1) — broadcasts with (M, F)
+ s0 = samples_n + coeff * s1 - s2
+ s2 = s1
+ s1 = s0
+
+ return s1 * s1 + s2 * s2 - coeff * s1 * s2
+
+
+def normalize_audio(raw: np.ndarray) -> np.ndarray:
+ """Normalize int16 PCM audio to float64 in range [-1.0, 1.0].
+
+ Args:
+ raw: Raw int16 samples from rtl_fm.
+
+ Returns:
+ Float64 normalized samples.
+ """
+ return raw.astype(np.float64) / 32768.0
diff --git a/utils/sstv/image_decoder.py b/utils/sstv/image_decoder.py
new file mode 100644
index 0000000..13a9eda
--- /dev/null
+++ b/utils/sstv/image_decoder.py
@@ -0,0 +1,453 @@
+"""SSTV scanline-by-scanline image decoder.
+
+Decodes raw audio samples into a PIL Image for all supported SSTV modes.
+Handles sync pulse re-synchronization on each line for robust decoding
+under weak-signal or drifting conditions.
+"""
+
+from __future__ import annotations
+
+from typing import Callable
+
+import numpy as np
+
+from .constants import (
+ FREQ_BLACK,
+ FREQ_PIXEL_HIGH,
+ FREQ_PIXEL_LOW,
+ FREQ_SYNC,
+ SAMPLE_RATE,
+)
+from .dsp import (
+ goertzel,
+ goertzel_batch,
+ samples_for_duration,
+)
+from .modes import (
+ ColorModel,
+ SSTVMode,
+ SyncPosition,
+)
+
+# Pillow is imported lazily to keep the module importable when Pillow
+# is not installed (is_sstv_available() just returns True, but actual
+# decoding would fail gracefully).
+try:
+ from PIL import Image
+except ImportError:
+ Image = None # type: ignore[assignment,misc]
+
+
+# Type alias for progress callback: (current_line, total_lines)
+ProgressCallback = Callable[[int, int], None]
+
+
+class SSTVImageDecoder:
+ """Decode an SSTV image from a stream of audio samples.
+
+ Usage::
+
+ decoder = SSTVImageDecoder(mode)
+ decoder.feed(samples)
+ ...
+ if decoder.is_complete:
+ image = decoder.get_image()
+ """
+
+ def __init__(self, mode: SSTVMode, sample_rate: int = SAMPLE_RATE,
+ progress_cb: ProgressCallback | None = None):
+ self._mode = mode
+ self._sample_rate = sample_rate
+ self._progress_cb = progress_cb
+
+ self._buffer = np.array([], dtype=np.float64)
+ self._current_line = 0
+ self._complete = False
+
+ # Pre-calculate sample counts
+ self._sync_samples = samples_for_duration(
+ mode.sync_duration_ms / 1000.0, sample_rate)
+ self._porch_samples = samples_for_duration(
+ mode.sync_porch_ms / 1000.0, sample_rate)
+ self._line_samples = samples_for_duration(
+ mode.line_duration_ms / 1000.0, sample_rate)
+ self._separator_samples = (
+ samples_for_duration(mode.channel_separator_ms / 1000.0, sample_rate)
+ if mode.channel_separator_ms > 0 else 0
+ )
+
+ self._channel_samples = [
+ samples_for_duration(ch.duration_ms / 1000.0, sample_rate)
+ for ch in mode.channels
+ ]
+
+ # For PD modes, each "line" of audio produces 2 image lines
+ if mode.color_model == ColorModel.YCRCB_DUAL:
+ self._total_audio_lines = mode.height // 2
+ else:
+ self._total_audio_lines = mode.height
+
+ # Initialize pixel data arrays per channel
+ self._channel_data: list[np.ndarray] = []
+ for _i, _ch_spec in enumerate(mode.channels):
+ if mode.color_model == ColorModel.YCRCB_DUAL:
+ # Y1, Cr, Cb, Y2 - all are width-wide
+ self._channel_data.append(
+ np.zeros((self._total_audio_lines, mode.width), dtype=np.uint8))
+ else:
+ self._channel_data.append(
+ np.zeros((mode.height, mode.width), dtype=np.uint8))
+
+ # Pre-compute candidate frequencies for batch pixel decoding (5 Hz step)
+ self._freq_candidates = np.arange(
+ FREQ_PIXEL_LOW - 100, FREQ_PIXEL_HIGH + 105, 5.0)
+
+ # Track sync position for re-synchronization
+ self._expected_line_start = 0 # Sample offset within buffer
+ self._synced = False
+
+ @property
+ def is_complete(self) -> bool:
+ return self._complete
+
+ @property
+ def current_line(self) -> int:
+ return self._current_line
+
+ @property
+ def total_lines(self) -> int:
+ return self._total_audio_lines
+
+ @property
+ def progress_percent(self) -> int:
+ if self._total_audio_lines == 0:
+ return 0
+ return min(100, int(100 * self._current_line / self._total_audio_lines))
+
+ def feed(self, samples: np.ndarray) -> bool:
+ """Feed audio samples into the decoder.
+
+ Args:
+ samples: Float64 audio samples.
+
+ Returns:
+ True when image is complete.
+ """
+ if self._complete:
+ return True
+
+ self._buffer = np.concatenate([self._buffer, samples])
+
+ # Process complete lines
+ while not self._complete and len(self._buffer) >= self._line_samples:
+ self._decode_line()
+
+ # Prevent unbounded buffer growth - keep at most 2 lines worth
+ max_buffer = self._line_samples * 2
+ if len(self._buffer) > max_buffer and not self._complete:
+ self._buffer = self._buffer[-max_buffer:]
+
+ return self._complete
+
+ def _find_sync(self, search_region: np.ndarray) -> int | None:
+ """Find the 1200 Hz sync pulse within a search region.
+
+ Scans through the region looking for a stretch of 1200 Hz
+ tone of approximately the right duration.
+
+ Args:
+ search_region: Audio samples to search within.
+
+ Returns:
+ Sample offset of the sync pulse start, or None if not found.
+ """
+ window_size = min(self._sync_samples, 200)
+ if len(search_region) < window_size:
+ return None
+
+ best_pos = None
+ best_energy = 0.0
+
+ step = window_size // 2
+ for pos in range(0, len(search_region) - window_size, step):
+ chunk = search_region[pos:pos + window_size]
+ sync_energy = goertzel(chunk, FREQ_SYNC, self._sample_rate)
+ # Check it's actually sync, not data at 1200 Hz area
+ black_energy = goertzel(chunk, FREQ_BLACK, self._sample_rate)
+ if sync_energy > best_energy and sync_energy > black_energy * 2:
+ best_energy = sync_energy
+ best_pos = pos
+
+ return best_pos
+
+ def _decode_line(self) -> None:
+ """Decode one scanline from the buffer."""
+ if self._current_line >= self._total_audio_lines:
+ self._complete = True
+ return
+
+ # Try to find sync pulse for re-synchronization
+ # Search within +/-10% of expected line start
+ search_margin = max(100, self._line_samples // 10)
+
+ line_start = 0
+
+ if self._mode.sync_position in (SyncPosition.FRONT, SyncPosition.FRONT_PD):
+ # Sync is at the beginning of each line
+ search_start = 0
+ search_end = min(len(self._buffer), self._sync_samples + search_margin)
+ search_region = self._buffer[search_start:search_end]
+
+ sync_pos = self._find_sync(search_region)
+ if sync_pos is not None:
+ line_start = sync_pos
+ # Skip sync + porch to get to pixel data
+ pixel_start = line_start + self._sync_samples + self._porch_samples
+
+ elif self._mode.sync_position == SyncPosition.MIDDLE:
+ # Scottie: sep(1.5ms) -> G -> sep(1.5ms) -> B -> sync(9ms) -> porch(1.5ms) -> R
+ # Skip initial separator (same duration as porch)
+ pixel_start = self._porch_samples
+ line_start = 0
+
+ else:
+ pixel_start = self._sync_samples + self._porch_samples
+
+ # Decode each channel
+ pos = pixel_start
+ for ch_idx, ch_samples in enumerate(self._channel_samples):
+ if pos + ch_samples > len(self._buffer):
+ # Not enough data yet - put the data back and wait
+ return
+
+ channel_audio = self._buffer[pos:pos + ch_samples]
+ pixels = self._decode_channel_pixels(channel_audio)
+ self._channel_data[ch_idx][self._current_line, :] = pixels
+ pos += ch_samples
+
+ # Add inter-channel gaps based on mode family
+ if ch_idx < len(self._channel_samples) - 1:
+ if self._mode.sync_position == SyncPosition.MIDDLE:
+ if ch_idx == 0:
+ # Scottie: separator between G and B
+ pos += self._porch_samples
+ else:
+ # Scottie: sync + porch between B and R
+ pos += self._sync_samples + self._porch_samples
+ elif self._separator_samples > 0:
+ # Robot: separator + porch between channels
+ pos += self._separator_samples
+ elif (self._mode.sync_position == SyncPosition.FRONT
+ and self._mode.color_model == ColorModel.RGB):
+ # Martin: porch between channels
+ pos += self._porch_samples
+
+ # Advance buffer past this line
+ consumed = max(pos, self._line_samples)
+ self._buffer = self._buffer[consumed:]
+
+ self._current_line += 1
+
+ if self._progress_cb:
+ self._progress_cb(self._current_line, self._total_audio_lines)
+
+ if self._current_line >= self._total_audio_lines:
+ self._complete = True
+
+ # Minimum analysis window for meaningful Goertzel frequency estimation.
+ # With 96 samples (2ms at 48kHz), frequency accuracy is within ~25 Hz,
+ # giving pixel-level accuracy of ~8/255 levels.
+ _MIN_ANALYSIS_WINDOW = 96
+
+ def _decode_channel_pixels(self, audio: np.ndarray) -> np.ndarray:
+ """Decode pixel values from a channel's audio data.
+
+ Uses batch Goertzel to estimate frequencies for all pixels
+ simultaneously, then maps to luminance values. When pixels have
+ fewer samples than ``_MIN_ANALYSIS_WINDOW``, overlapping analysis
+ windows are used to maintain frequency estimation accuracy.
+
+ Args:
+ audio: Audio samples for one channel of one scanline.
+
+ Returns:
+ Array of pixel values (0-255), shape (width,).
+ """
+ width = self._mode.width
+ samples_per_pixel = max(1, len(audio) // width)
+
+ if len(audio) < width or samples_per_pixel < 2:
+ return np.zeros(width, dtype=np.uint8)
+
+ window_size = max(samples_per_pixel, self._MIN_ANALYSIS_WINDOW)
+
+ if window_size > samples_per_pixel and len(audio) >= window_size:
+ # Use overlapping windows centered on each pixel position
+ windows = np.lib.stride_tricks.sliding_window_view(
+ audio, window_size)
+ # Pixel centers, clamped to valid window indices
+ centers = np.arange(width) * samples_per_pixel
+ indices = np.minimum(centers, len(windows) - 1)
+ audio_matrix = np.ascontiguousarray(windows[indices])
+ else:
+ # Non-overlapping: each pixel has enough samples
+ usable = width * samples_per_pixel
+ audio_matrix = audio[:usable].reshape(width, samples_per_pixel)
+
+ # Batch Goertzel at all candidate frequencies
+ energies = goertzel_batch(
+ audio_matrix, self._freq_candidates, self._sample_rate)
+
+ # Find peak frequency per pixel
+ best_idx = np.argmax(energies, axis=1)
+ best_freqs = self._freq_candidates[best_idx]
+
+ # Map frequencies to pixel values (1500 Hz = 0, 2300 Hz = 255)
+ normalized = (best_freqs - FREQ_PIXEL_LOW) / (FREQ_PIXEL_HIGH - FREQ_PIXEL_LOW)
+ return np.clip(normalized * 255 + 0.5, 0, 255).astype(np.uint8)
+
+ def get_image(self) -> Image.Image | None:
+ """Convert decoded channel data to a PIL Image.
+
+ Returns:
+ PIL Image in RGB mode, or None if Pillow is not available
+ or decoding is incomplete.
+ """
+ if Image is None:
+ return None
+
+ mode = self._mode
+
+ if mode.color_model == ColorModel.RGB:
+ return self._assemble_rgb()
+ elif mode.color_model == ColorModel.YCRCB:
+ return self._assemble_ycrcb()
+ elif mode.color_model == ColorModel.YCRCB_DUAL:
+ return self._assemble_ycrcb_dual()
+
+ return None
+
+ def _assemble_rgb(self) -> Image.Image:
+ """Assemble RGB image from sequential R, G, B channel data.
+
+ Martin/Scottie channel order: G, B, R.
+ """
+ height = self._mode.height
+
+ # Channel order for Martin/Scottie: [0]=G, [1]=B, [2]=R
+ g_data = self._channel_data[0][:height]
+ b_data = self._channel_data[1][:height]
+ r_data = self._channel_data[2][:height]
+
+ rgb = np.stack([r_data, g_data, b_data], axis=-1)
+ return Image.fromarray(rgb, 'RGB')
+
+ def _assemble_ycrcb(self) -> Image.Image:
+ """Assemble image from YCrCb data (Robot modes).
+
+ Robot36: Y every line, Cr/Cb alternating (half-rate chroma).
+ Robot72: Y, Cr, Cb every line (full-rate chroma).
+ """
+ height = self._mode.height
+ width = self._mode.width
+
+ if not self._mode.has_half_rate_chroma:
+ # Full-rate chroma (Robot72): Y, Cr, Cb as separate channels
+ y_data = self._channel_data[0][:height].astype(np.float64)
+ cr = self._channel_data[1][:height].astype(np.float64)
+ cb = self._channel_data[2][:height].astype(np.float64)
+ return self._ycrcb_to_rgb(y_data, cr, cb, height, width)
+
+ # Half-rate chroma (Robot36): Y + alternating Cr/Cb
+ y_data = self._channel_data[0][:height].astype(np.float64)
+ chroma_data = self._channel_data[1][:height].astype(np.float64)
+
+ # Separate Cr (even lines) and Cb (odd lines), then interpolate
+ cr = np.zeros((height, width), dtype=np.float64)
+ cb = np.zeros((height, width), dtype=np.float64)
+
+ for line in range(height):
+ if line % 2 == 0:
+ cr[line] = chroma_data[line]
+ else:
+ cb[line] = chroma_data[line]
+
+ # Interpolate missing chroma lines
+ for line in range(height):
+ if line % 2 == 1:
+ # Missing Cr - interpolate from neighbors
+ prev_cr = line - 1 if line > 0 else line + 1
+ next_cr = line + 1 if line + 1 < height else line - 1
+ cr[line] = (cr[prev_cr] + cr[next_cr]) / 2
+ else:
+ # Missing Cb - interpolate from neighbors
+ prev_cb = line - 1 if line > 0 else line + 1
+ next_cb = line + 1 if line + 1 < height else line - 1
+ if prev_cb >= 0 and next_cb < height:
+ cb[line] = (cb[prev_cb] + cb[next_cb]) / 2
+ elif prev_cb >= 0:
+ cb[line] = cb[prev_cb]
+ else:
+ cb[line] = cb[next_cb]
+
+ return self._ycrcb_to_rgb(y_data, cr, cb, height, width)
+
+ def _assemble_ycrcb_dual(self) -> Image.Image:
+ """Assemble image from dual-luminance YCrCb data (PD modes).
+
+ PD modes send Y1, Cr, Cb, Y2 per audio line, producing 2 image lines.
+ """
+ audio_lines = self._total_audio_lines
+ width = self._mode.width
+ height = self._mode.height
+
+ y1_data = self._channel_data[0][:audio_lines].astype(np.float64)
+ cr_data = self._channel_data[1][:audio_lines].astype(np.float64)
+ cb_data = self._channel_data[2][:audio_lines].astype(np.float64)
+ y2_data = self._channel_data[3][:audio_lines].astype(np.float64)
+
+ # Interleave Y1 and Y2 to produce full-height luminance
+ y_full = np.zeros((height, width), dtype=np.float64)
+ cr_full = np.zeros((height, width), dtype=np.float64)
+ cb_full = np.zeros((height, width), dtype=np.float64)
+
+ for i in range(audio_lines):
+ even_line = i * 2
+ odd_line = i * 2 + 1
+ if even_line < height:
+ y_full[even_line] = y1_data[i]
+ cr_full[even_line] = cr_data[i]
+ cb_full[even_line] = cb_data[i]
+ if odd_line < height:
+ y_full[odd_line] = y2_data[i]
+ cr_full[odd_line] = cr_data[i]
+ cb_full[odd_line] = cb_data[i]
+
+ return self._ycrcb_to_rgb(y_full, cr_full, cb_full, height, width)
+
+ @staticmethod
+ def _ycrcb_to_rgb(y: np.ndarray, cr: np.ndarray, cb: np.ndarray,
+ height: int, width: int) -> Image.Image:
+ """Convert YCrCb pixel data to an RGB PIL Image.
+
+ Uses the SSTV convention where pixel values 0-255 map to the
+ standard Y'CbCr color space used by JPEG/SSTV.
+ """
+ # Normalize from 0-255 pixel range to standard ranges
+ # Y: 0-255, Cr/Cb: 0-255 centered at 128
+ y_norm = y
+ cr_norm = cr - 128.0
+ cb_norm = cb - 128.0
+
+ # ITU-R BT.601 conversion
+ r = y_norm + 1.402 * cr_norm
+ g = y_norm - 0.344136 * cb_norm - 0.714136 * cr_norm
+ b = y_norm + 1.772 * cb_norm
+
+ # Clip and convert
+ r = np.clip(r, 0, 255).astype(np.uint8)
+ g = np.clip(g, 0, 255).astype(np.uint8)
+ b = np.clip(b, 0, 255).astype(np.uint8)
+
+ rgb = np.stack([r, g, b], axis=-1)
+ return Image.fromarray(rgb, 'RGB')
diff --git a/utils/sstv/modes.py b/utils/sstv/modes.py
new file mode 100644
index 0000000..6d2d7b4
--- /dev/null
+++ b/utils/sstv/modes.py
@@ -0,0 +1,250 @@
+"""SSTV mode specifications.
+
+Dataclass definitions for each supported SSTV mode, encoding resolution,
+color model, line timing, and sync characteristics.
+"""
+
+from __future__ import annotations
+
+import enum
+from dataclasses import dataclass, field
+
+
+class ColorModel(enum.Enum):
+ """Color encoding models used by SSTV modes."""
+ RGB = 'rgb' # Sequential R, G, B channels per line
+ YCRCB = 'ycrcb' # Luminance + chrominance (Robot modes)
+ YCRCB_DUAL = 'ycrcb_dual' # Dual-luminance YCrCb (PD modes)
+
+
+class SyncPosition(enum.Enum):
+ """Where the horizontal sync pulse appears in each line."""
+ FRONT = 'front' # Sync at start of line (Robot, Martin)
+ MIDDLE = 'middle' # Sync between G and B channels (Scottie)
+ FRONT_PD = 'front_pd' # PD-style sync at start
+
+
+@dataclass(frozen=True)
+class ChannelTiming:
+ """Timing for a single color channel within a scanline.
+
+ Attributes:
+ duration_ms: Duration of this channel's pixel data in milliseconds.
+ """
+ duration_ms: float
+
+
+@dataclass(frozen=True)
+class SSTVMode:
+ """Complete specification of an SSTV mode.
+
+ Attributes:
+ name: Human-readable mode name (e.g. 'Robot36').
+ vis_code: VIS code that identifies this mode.
+ width: Image width in pixels.
+ height: Image height in lines.
+ color_model: Color encoding model.
+ sync_position: Where the sync pulse falls in each line.
+ sync_duration_ms: Horizontal sync pulse duration (ms).
+ sync_porch_ms: Porch (gap) after sync pulse (ms).
+ channels: Timing for each color channel per line.
+ line_duration_ms: Total duration of one complete scanline (ms).
+ has_half_rate_chroma: Whether chroma is sent at half vertical rate
+ (Robot modes: Cr and Cb alternate every other line).
+ """
+ name: str
+ vis_code: int
+ width: int
+ height: int
+ color_model: ColorModel
+ sync_position: SyncPosition
+ sync_duration_ms: float
+ sync_porch_ms: float
+ channels: list[ChannelTiming] = field(default_factory=list)
+ line_duration_ms: float = 0.0
+ has_half_rate_chroma: bool = False
+ channel_separator_ms: float = 0.0 # Time gap between color channels (ms)
+
+
+# ---------------------------------------------------------------------------
+# Robot family
+# ---------------------------------------------------------------------------
+
+ROBOT_36 = SSTVMode(
+ name='Robot36',
+ vis_code=8,
+ width=320,
+ height=240,
+ color_model=ColorModel.YCRCB,
+ sync_position=SyncPosition.FRONT,
+ sync_duration_ms=9.0,
+ sync_porch_ms=3.0,
+ channels=[
+ ChannelTiming(duration_ms=88.0), # Y (luminance)
+ ChannelTiming(duration_ms=44.0), # Cr or Cb (alternating)
+ ],
+ line_duration_ms=150.0,
+ has_half_rate_chroma=True,
+ channel_separator_ms=6.0,
+)
+
+ROBOT_72 = SSTVMode(
+ name='Robot72',
+ vis_code=12,
+ width=320,
+ height=240,
+ color_model=ColorModel.YCRCB,
+ sync_position=SyncPosition.FRONT,
+ sync_duration_ms=9.0,
+ sync_porch_ms=3.0,
+ channels=[
+ ChannelTiming(duration_ms=138.0), # Y (luminance)
+ ChannelTiming(duration_ms=69.0), # Cr
+ ChannelTiming(duration_ms=69.0), # Cb
+ ],
+ line_duration_ms=300.0,
+ has_half_rate_chroma=False,
+ channel_separator_ms=6.0,
+)
+
+# ---------------------------------------------------------------------------
+# Martin family
+# ---------------------------------------------------------------------------
+
+MARTIN_1 = SSTVMode(
+ name='Martin1',
+ vis_code=44,
+ width=320,
+ height=256,
+ color_model=ColorModel.RGB,
+ sync_position=SyncPosition.FRONT,
+ sync_duration_ms=4.862,
+ sync_porch_ms=0.572,
+ channels=[
+ ChannelTiming(duration_ms=146.432), # Green
+ ChannelTiming(duration_ms=146.432), # Blue
+ ChannelTiming(duration_ms=146.432), # Red
+ ],
+ line_duration_ms=446.446,
+)
+
+MARTIN_2 = SSTVMode(
+ name='Martin2',
+ vis_code=40,
+ width=320,
+ height=256,
+ color_model=ColorModel.RGB,
+ sync_position=SyncPosition.FRONT,
+ sync_duration_ms=4.862,
+ sync_porch_ms=0.572,
+ channels=[
+ ChannelTiming(duration_ms=73.216), # Green
+ ChannelTiming(duration_ms=73.216), # Blue
+ ChannelTiming(duration_ms=73.216), # Red
+ ],
+ line_duration_ms=226.798,
+)
+
+# ---------------------------------------------------------------------------
+# Scottie family
+# ---------------------------------------------------------------------------
+
+SCOTTIE_1 = SSTVMode(
+ name='Scottie1',
+ vis_code=60,
+ width=320,
+ height=256,
+ color_model=ColorModel.RGB,
+ sync_position=SyncPosition.MIDDLE,
+ sync_duration_ms=9.0,
+ sync_porch_ms=1.5,
+ channels=[
+ ChannelTiming(duration_ms=138.240), # Green
+ ChannelTiming(duration_ms=138.240), # Blue
+ ChannelTiming(duration_ms=138.240), # Red
+ ],
+ line_duration_ms=428.220,
+)
+
+SCOTTIE_2 = SSTVMode(
+ name='Scottie2',
+ vis_code=56,
+ width=320,
+ height=256,
+ color_model=ColorModel.RGB,
+ sync_position=SyncPosition.MIDDLE,
+ sync_duration_ms=9.0,
+ sync_porch_ms=1.5,
+ channels=[
+ ChannelTiming(duration_ms=88.064), # Green
+ ChannelTiming(duration_ms=88.064), # Blue
+ ChannelTiming(duration_ms=88.064), # Red
+ ],
+ line_duration_ms=277.692,
+)
+
+# ---------------------------------------------------------------------------
+# PD (Pasokon) family
+# ---------------------------------------------------------------------------
+
+PD_120 = SSTVMode(
+ name='PD120',
+ vis_code=93,
+ width=640,
+ height=496,
+ color_model=ColorModel.YCRCB_DUAL,
+ sync_position=SyncPosition.FRONT_PD,
+ sync_duration_ms=20.0,
+ sync_porch_ms=2.080,
+ channels=[
+ ChannelTiming(duration_ms=121.600), # Y1 (even line luminance)
+ ChannelTiming(duration_ms=121.600), # Cr
+ ChannelTiming(duration_ms=121.600), # Cb
+ ChannelTiming(duration_ms=121.600), # Y2 (odd line luminance)
+ ],
+ line_duration_ms=508.480,
+)
+
+PD_180 = SSTVMode(
+ name='PD180',
+ vis_code=95,
+ width=640,
+ height=496,
+ color_model=ColorModel.YCRCB_DUAL,
+ sync_position=SyncPosition.FRONT_PD,
+ sync_duration_ms=20.0,
+ sync_porch_ms=2.080,
+ channels=[
+ ChannelTiming(duration_ms=183.040), # Y1
+ ChannelTiming(duration_ms=183.040), # Cr
+ ChannelTiming(duration_ms=183.040), # Cb
+ ChannelTiming(duration_ms=183.040), # Y2
+ ],
+ line_duration_ms=754.240,
+)
+
+
+# ---------------------------------------------------------------------------
+# Mode registry
+# ---------------------------------------------------------------------------
+
+ALL_MODES: dict[int, SSTVMode] = {
+ m.vis_code: m for m in [
+ ROBOT_36, ROBOT_72,
+ MARTIN_1, MARTIN_2,
+ SCOTTIE_1, SCOTTIE_2,
+ PD_120, PD_180,
+ ]
+}
+
+MODE_BY_NAME: dict[str, SSTVMode] = {m.name: m for m in ALL_MODES.values()}
+
+
+def get_mode(vis_code: int) -> SSTVMode | None:
+ """Look up an SSTV mode by its VIS code."""
+ return ALL_MODES.get(vis_code)
+
+
+def get_mode_by_name(name: str) -> SSTVMode | None:
+ """Look up an SSTV mode by name."""
+ return MODE_BY_NAME.get(name)
diff --git a/utils/sstv.py b/utils/sstv/sstv_decoder.py
similarity index 53%
rename from utils/sstv.py
rename to utils/sstv/sstv_decoder.py
index 74deb76..881b57b 100644
--- a/utils/sstv.py
+++ b/utils/sstv/sstv_decoder.py
@@ -1,792 +1,782 @@
-"""SSTV (Slow-Scan Television) decoder for ISS transmissions.
-
-This module provides SSTV decoding capabilities for receiving images
-from the International Space Station during special events.
-
-ISS SSTV typically transmits on 145.800 MHz FM.
-
-Includes real-time Doppler shift compensation for improved reception.
-"""
-
-from __future__ import annotations
-
-import os
-import queue
-import subprocess
-import threading
-import time
-from dataclasses import dataclass, field
-from datetime import datetime, timezone, timedelta
-from pathlib import Path
-from typing import Callable
-
-from utils.logging import get_logger
-
-logger = get_logger('intercept.sstv')
-
-# ISS SSTV frequency
-ISS_SSTV_FREQ = 145.800 # MHz
-
-# Speed of light in m/s
-SPEED_OF_LIGHT = 299_792_458
-
-# Common SSTV modes used by ISS
-SSTV_MODES = ['PD120', 'PD180', 'Martin1', 'Martin2', 'Scottie1', 'Scottie2', 'Robot36']
-
-
-@dataclass
-class DopplerInfo:
- """Doppler shift information."""
- frequency_hz: float # Doppler-corrected frequency in Hz
- shift_hz: float # Doppler shift in Hz (positive = approaching)
- range_rate_km_s: float # Range rate in km/s (negative = approaching)
- elevation: float # Current elevation in degrees
- azimuth: float # Current azimuth in degrees
- timestamp: datetime
-
- def to_dict(self) -> dict:
- return {
- 'frequency_hz': self.frequency_hz,
- 'shift_hz': round(self.shift_hz, 1),
- 'range_rate_km_s': round(self.range_rate_km_s, 3),
- 'elevation': round(self.elevation, 1),
- 'azimuth': round(self.azimuth, 1),
- 'timestamp': self.timestamp.isoformat(),
- }
-
-
-class DopplerTracker:
- """
- Real-time Doppler shift calculator for satellite tracking.
-
- Uses skyfield to calculate the range rate between observer and satellite,
- then computes the Doppler-shifted receive frequency.
- """
-
- def __init__(self, satellite_name: str = 'ISS'):
- self._satellite_name = satellite_name
- self._observer_lat: float | None = None
- self._observer_lon: float | None = None
- self._satellite = None
- self._observer = None
- self._ts = None
- self._enabled = False
-
- def configure(self, latitude: float, longitude: float) -> bool:
- """
- Configure the Doppler tracker with observer location.
-
- Args:
- latitude: Observer latitude in degrees
- longitude: Observer longitude in degrees
-
- Returns:
- True if configured successfully
- """
- try:
- from skyfield.api import load, wgs84, EarthSatellite
- from data.satellites import TLE_SATELLITES
-
- # Get satellite TLE
- tle_data = TLE_SATELLITES.get(self._satellite_name)
- if not tle_data:
- logger.error(f"No TLE data for satellite: {self._satellite_name}")
- return False
-
- self._ts = load.timescale()
- self._satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], self._ts)
- self._observer = wgs84.latlon(latitude, longitude)
- self._observer_lat = latitude
- self._observer_lon = longitude
- self._enabled = True
-
- logger.info(f"Doppler tracker configured for {self._satellite_name} at ({latitude}, {longitude})")
- return True
-
- except ImportError:
- logger.warning("skyfield not available - Doppler tracking disabled")
- return False
- except Exception as e:
- logger.error(f"Failed to configure Doppler tracker: {e}")
- return False
-
- @property
- def is_enabled(self) -> bool:
- return self._enabled
-
- def calculate(self, nominal_freq_mhz: float) -> DopplerInfo | None:
- """
- Calculate current Doppler-shifted frequency.
-
- Args:
- nominal_freq_mhz: Nominal transmit frequency in MHz
-
- Returns:
- DopplerInfo with corrected frequency, or None if unavailable
- """
- if not self._enabled or not self._satellite or not self._observer:
- return None
-
- try:
- # Get current time
- t = self._ts.now()
-
- # Calculate satellite position relative to observer
- difference = self._satellite - self._observer
- topocentric = difference.at(t)
-
- # Get altitude/azimuth
- alt, az, distance = topocentric.altaz()
-
- # Get velocity (range rate) - negative means approaching
- # We need the rate of change of distance
- # Calculate positions slightly apart to get velocity
- dt_seconds = 1.0
- t_future = self._ts.utc(t.utc_datetime() + timedelta(seconds=dt_seconds))
-
- topocentric_future = difference.at(t_future)
- _, _, distance_future = topocentric_future.altaz()
-
- # Range rate in km/s (negative = approaching = positive Doppler)
- range_rate_km_s = (distance_future.km - distance.km) / dt_seconds
-
- # Calculate Doppler shift
- # f_received = f_transmitted * (1 - v_radial / c)
- # When approaching (negative range_rate), frequency is higher
- nominal_freq_hz = nominal_freq_mhz * 1_000_000
- doppler_factor = 1 - (range_rate_km_s * 1000 / SPEED_OF_LIGHT)
- corrected_freq_hz = nominal_freq_hz * doppler_factor
- shift_hz = corrected_freq_hz - nominal_freq_hz
-
- return DopplerInfo(
- frequency_hz=corrected_freq_hz,
- shift_hz=shift_hz,
- range_rate_km_s=range_rate_km_s,
- elevation=alt.degrees,
- azimuth=az.degrees,
- timestamp=datetime.now(timezone.utc)
- )
-
- except Exception as e:
- logger.error(f"Doppler calculation failed: {e}")
- return None
-
-
-@dataclass
-class SSTVImage:
- """Decoded SSTV image."""
- filename: str
- path: Path
- mode: str
- timestamp: datetime
- frequency: float
- size_bytes: int = 0
- url_prefix: str = '/sstv'
-
- def to_dict(self) -> dict:
- return {
- 'filename': self.filename,
- 'path': str(self.path),
- 'mode': self.mode,
- 'timestamp': self.timestamp.isoformat(),
- 'frequency': self.frequency,
- 'size_bytes': self.size_bytes,
- 'url': f'{self.url_prefix}/images/{self.filename}'
- }
-
-
-@dataclass
-class DecodeProgress:
- """SSTV decode progress update."""
- status: str # 'detecting', 'decoding', 'complete', 'error'
- mode: str | None = None
- progress_percent: int = 0
- message: str | None = None
- image: SSTVImage | None = None
-
- def to_dict(self) -> dict:
- result = {
- 'type': 'sstv_progress',
- 'status': self.status,
- 'progress': self.progress_percent,
- }
- if self.mode:
- result['mode'] = self.mode
- if self.message:
- result['message'] = self.message
- if self.image:
- result['image'] = self.image.to_dict()
- return result
-
-
-class SSTVDecoder:
- """SSTV decoder using external tools (slowrx) with Doppler compensation."""
-
- # Minimum frequency change (Hz) before retuning rtl_fm
- RETUNE_THRESHOLD_HZ = 500
-
- # How often to check/update Doppler (seconds)
- DOPPLER_UPDATE_INTERVAL = 5
-
- def __init__(self, output_dir: str | Path | None = None, url_prefix: str = '/sstv'):
- self._process = None
- self._rtl_process = None
- self._running = False
- self._lock = threading.Lock()
- self._callback: Callable[[DecodeProgress], None] | None = None
- self._output_dir = Path(output_dir) if output_dir else Path('instance/sstv_images')
- self._url_prefix = url_prefix
- self._images: list[SSTVImage] = []
- self._reader_thread = None
- self._watcher_thread = None
- self._doppler_thread = None
- self._frequency = ISS_SSTV_FREQ
- self._modulation = 'fm'
- self._current_tuned_freq_hz: int = 0
- self._device_index = 0
-
- # Doppler tracking
- self._doppler_tracker = DopplerTracker('ISS')
- self._doppler_enabled = False
- self._last_doppler_info: DopplerInfo | None = None
- self._file_decoder: str | None = None
-
- # Ensure output directory exists
- self._output_dir.mkdir(parents=True, exist_ok=True)
-
- # Detect available decoder
- self._decoder = self._detect_decoder()
-
- @property
- def is_running(self) -> bool:
- return self._running
-
- @property
- def decoder_available(self) -> str | None:
- """Return name of available decoder or None."""
- return self._decoder
-
- def _detect_decoder(self) -> str | None:
- """Detect which SSTV decoder is available."""
- # Check for slowrx (command-line SSTV decoder)
- try:
- result = subprocess.run(['which', 'slowrx'], capture_output=True, timeout=5)
- if result.returncode == 0:
- self._file_decoder = 'slowrx'
- return 'slowrx'
- except Exception:
- pass
-
- # Note: qsstv is GUI-only and not suitable for headless/server operation
-
- # Check for Python sstv package
- try:
- import sstv
- self._file_decoder = 'python-sstv'
- return None
- except ImportError:
- pass
-
- logger.warning("No SSTV decoder found. Install slowrx (apt install slowrx) or python sstv package. Note: qsstv is GUI-only and not supported for headless operation.")
- return None
-
- def set_callback(self, callback: Callable[[DecodeProgress], None]) -> None:
- """Set callback for decode progress updates."""
- self._callback = callback
-
- def start(
- self,
- frequency: float = ISS_SSTV_FREQ,
- device_index: int = 0,
- latitude: float | None = None,
- longitude: float | None = None,
- modulation: str = 'fm',
- ) -> bool:
- """
- Start SSTV decoder listening on specified frequency.
-
- Args:
- frequency: Frequency in MHz (default: 145.800 for ISS)
- device_index: RTL-SDR device index
- latitude: Observer latitude for Doppler correction (optional)
- longitude: Observer longitude for Doppler correction (optional)
- modulation: Demodulation mode for rtl_fm (fm, usb, lsb). Default: fm
-
- Returns:
- True if started successfully
- """
- with self._lock:
- if self._running:
- return True
-
- if not self._decoder:
- logger.error("No SSTV decoder available")
- self._emit_progress(DecodeProgress(
- status='error',
- message='No SSTV decoder installed. Install slowrx: apt install slowrx'
- ))
- return False
-
- self._frequency = frequency
- self._device_index = device_index
- self._modulation = modulation
-
- # Configure Doppler tracking if location provided
- self._doppler_enabled = False
- if latitude is not None and longitude is not None:
- if self._doppler_tracker.configure(latitude, longitude):
- self._doppler_enabled = True
- logger.info(f"Doppler tracking enabled for location ({latitude}, {longitude})")
- else:
- logger.warning("Doppler tracking unavailable - using fixed frequency")
-
- try:
- if self._decoder == 'slowrx':
- self._start_slowrx()
- elif self._decoder == 'python-sstv':
- self._start_python_sstv()
- else:
- logger.error(f"Unsupported decoder: {self._decoder}")
- return False
-
- self._running = True
-
- # Start Doppler tracking thread if enabled
- if self._doppler_enabled:
- self._doppler_thread = threading.Thread(target=self._doppler_tracking_loop, daemon=True)
- self._doppler_thread.start()
- logger.info(f"SSTV decoder started on {frequency} MHz with Doppler tracking")
- self._emit_progress(DecodeProgress(
- status='detecting',
- message=f'Listening on {frequency} MHz with Doppler tracking...'
- ))
- else:
- logger.info(f"SSTV decoder started on {frequency} MHz (no Doppler tracking)")
- self._emit_progress(DecodeProgress(
- status='detecting',
- message=f'Listening on {frequency} MHz...'
- ))
-
- return True
-
- except Exception as e:
- logger.error(f"Failed to start SSTV decoder: {e}")
- self._emit_progress(DecodeProgress(
- status='error',
- message=str(e)
- ))
- return False
-
- def _start_slowrx(self) -> None:
- """Start slowrx decoder with rtl_fm piped input."""
- # Calculate initial frequency (with Doppler correction if enabled)
- freq_hz = self._get_doppler_corrected_freq_hz()
- self._current_tuned_freq_hz = freq_hz
-
- self._start_rtl_fm_pipeline(freq_hz)
-
- def _get_doppler_corrected_freq_hz(self) -> int:
- """Get the Doppler-corrected frequency in Hz."""
- nominal_freq_hz = int(self._frequency * 1_000_000)
-
- if self._doppler_enabled:
- doppler_info = self._doppler_tracker.calculate(self._frequency)
- if doppler_info:
- self._last_doppler_info = doppler_info
- corrected_hz = int(doppler_info.frequency_hz)
- logger.info(
- f"Doppler correction: {doppler_info.shift_hz:+.1f} Hz "
- f"(range rate: {doppler_info.range_rate_km_s:+.3f} km/s, "
- f"el: {doppler_info.elevation:.1f}°)"
- )
- return corrected_hz
-
- return nominal_freq_hz
-
- def _start_rtl_fm_pipeline(self, freq_hz: int) -> None:
- """Start the rtl_fm -> slowrx pipeline at the specified frequency."""
- # Build rtl_fm command for demodulation
- rtl_cmd = [
- 'rtl_fm',
- '-d', str(self._device_index),
- '-f', str(freq_hz),
- '-M', self._modulation,
- '-s', '48000',
- '-r', '48000',
- '-l', '0', # No squelch
- '-'
- ]
-
- # slowrx reads from stdin and outputs images to directory
- slowrx_cmd = [
- 'slowrx',
- '-o', str(self._output_dir),
- '-'
- ]
-
- logger.info(f"Starting rtl_fm: {' '.join(rtl_cmd)}")
- logger.info(f"Piping to slowrx: {' '.join(slowrx_cmd)}")
-
- # Start rtl_fm
- self._rtl_process = subprocess.Popen(
- rtl_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- # Start slowrx reading from rtl_fm
- self._process = subprocess.Popen(
- slowrx_cmd,
- stdin=self._rtl_process.stdout,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- # Start reader thread to monitor output
- self._reader_thread = threading.Thread(target=self._read_slowrx_output, daemon=True)
- self._reader_thread.start()
-
- # Start image watcher thread
- self._watcher_thread = threading.Thread(target=self._watch_images, daemon=True)
- self._watcher_thread.start()
-
- def _doppler_tracking_loop(self) -> None:
- """Background thread that monitors Doppler shift and retunes when needed."""
- logger.info("Doppler tracking thread started")
-
- while self._running and self._doppler_enabled:
- time.sleep(self.DOPPLER_UPDATE_INTERVAL)
-
- if not self._running:
- break
-
- try:
- doppler_info = self._doppler_tracker.calculate(self._frequency)
- if not doppler_info:
- continue
-
- self._last_doppler_info = doppler_info
- new_freq_hz = int(doppler_info.frequency_hz)
- freq_diff = abs(new_freq_hz - self._current_tuned_freq_hz)
-
- # Log current Doppler status
- logger.debug(
- f"Doppler: {doppler_info.shift_hz:+.1f} Hz, "
- f"el: {doppler_info.elevation:.1f}°, "
- f"diff from tuned: {freq_diff} Hz"
- )
-
- # Emit Doppler update to callback
- self._emit_progress(DecodeProgress(
- status='detecting',
- message=f'Doppler: {doppler_info.shift_hz:+.0f} Hz, elevation: {doppler_info.elevation:.1f}°'
- ))
-
- # Retune if frequency has drifted enough
- if freq_diff >= self.RETUNE_THRESHOLD_HZ:
- logger.info(
- f"Retuning: {self._current_tuned_freq_hz} -> {new_freq_hz} Hz "
- f"(Doppler shift: {doppler_info.shift_hz:+.1f} Hz)"
- )
- self._retune_rtl_fm(new_freq_hz)
-
- except Exception as e:
- logger.error(f"Doppler tracking error: {e}")
-
- logger.info("Doppler tracking thread stopped")
-
- def _retune_rtl_fm(self, new_freq_hz: int) -> None:
- """
- Retune rtl_fm to a new frequency.
-
- Since rtl_fm doesn't support dynamic frequency changes, we need to
- restart the rtl_fm process. The slowrx process continues running
- and will resume decoding when audio resumes.
- """
- with self._lock:
- if not self._running:
- return
-
- # Terminate old rtl_fm process
- if self._rtl_process:
- try:
- self._rtl_process.terminate()
- self._rtl_process.wait(timeout=2)
- except Exception:
- try:
- self._rtl_process.kill()
- except Exception:
- pass
-
- # Start new rtl_fm at new frequency
- rtl_cmd = [
- 'rtl_fm',
- '-d', str(self._device_index),
- '-f', str(new_freq_hz),
- '-M', self._modulation,
- '-s', '48000',
- '-r', '48000',
- '-l', '0',
- '-'
- ]
-
- logger.debug(f"Restarting rtl_fm: {' '.join(rtl_cmd)}")
-
- self._rtl_process = subprocess.Popen(
- rtl_cmd,
- stdout=self._process.stdin if self._process else subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- self._current_tuned_freq_hz = new_freq_hz
-
- @property
- def last_doppler_info(self) -> DopplerInfo | None:
- """Get the most recent Doppler calculation."""
- return self._last_doppler_info
-
- @property
- def doppler_enabled(self) -> bool:
- """Check if Doppler tracking is enabled."""
- return self._doppler_enabled
-
- def _start_python_sstv(self) -> None:
- """Start Python SSTV decoder (requires audio file input)."""
- # Python sstv package typically works with audio files
- # For real-time decoding, we'd need to record audio first
- # This is a simplified implementation
- logger.warning("Python SSTV package requires audio file input")
- self._emit_progress(DecodeProgress(
- status='error',
- message='Python SSTV decoder requires audio files. Use slowrx for real-time decoding.'
- ))
- raise NotImplementedError("Real-time Python SSTV not implemented")
-
- def _read_slowrx_output(self) -> None:
- """Read slowrx stderr for progress updates."""
- if not self._process:
- return
-
- try:
- for line in iter(self._process.stderr.readline, b''):
- if not self._running:
- break
-
- line_str = line.decode('utf-8', errors='ignore').strip()
- if not line_str:
- continue
-
- logger.debug(f"slowrx: {line_str}")
-
- # Parse slowrx output for mode detection and progress
- if 'Detected' in line_str or 'mode' in line_str.lower():
- for mode in SSTV_MODES:
- if mode.lower() in line_str.lower():
- self._emit_progress(DecodeProgress(
- status='decoding',
- mode=mode,
- message=f'Decoding {mode} image...'
- ))
- break
-
- except Exception as e:
- logger.error(f"Error reading slowrx output: {e}")
-
- def _watch_images(self) -> None:
- """Watch output directory for new images."""
- known_files = set(f.name for f in self._output_dir.glob('*.png'))
-
- while self._running:
- time.sleep(1)
-
- try:
- current_files = set(f.name for f in self._output_dir.glob('*.png'))
- new_files = current_files - known_files
-
- for filename in new_files:
- filepath = self._output_dir / filename
- if filepath.exists():
- # New image detected
- image = SSTVImage(
- filename=filename,
- path=filepath,
- mode='Unknown', # Would need to parse from slowrx output
- timestamp=datetime.now(timezone.utc),
- frequency=self._frequency,
- size_bytes=filepath.stat().st_size,
- url_prefix=self._url_prefix,
- )
- self._images.append(image)
-
- logger.info(f"New SSTV image: {filename}")
- self._emit_progress(DecodeProgress(
- status='complete',
- message='Image decoded',
- image=image
- ))
-
- known_files = current_files
-
- except Exception as e:
- logger.error(f"Error watching images: {e}")
-
- def stop(self) -> None:
- """Stop SSTV decoder."""
- with self._lock:
- self._running = False
-
- if hasattr(self, '_rtl_process') and self._rtl_process:
- try:
- self._rtl_process.terminate()
- self._rtl_process.wait(timeout=5)
- except Exception:
- self._rtl_process.kill()
- self._rtl_process = None
-
- if self._process:
- try:
- self._process.terminate()
- self._process.wait(timeout=5)
- except Exception:
- self._process.kill()
- self._process = None
-
- logger.info("SSTV decoder stopped")
-
- def get_images(self) -> list[SSTVImage]:
- """Get list of decoded images."""
- # Also scan directory for any images we might have missed
- self._scan_images()
- return list(self._images)
-
- def _scan_images(self) -> None:
- """Scan output directory for images."""
- known_filenames = {img.filename for img in self._images}
-
- for filepath in self._output_dir.glob('*.png'):
- if filepath.name not in known_filenames:
- try:
- stat = filepath.stat()
- image = SSTVImage(
- filename=filepath.name,
- path=filepath,
- mode='Unknown',
- timestamp=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
- frequency=self._frequency,
- size_bytes=stat.st_size,
- url_prefix=self._url_prefix,
- )
- self._images.append(image)
- except Exception as e:
- logger.warning(f"Error scanning image {filepath}: {e}")
-
- def _emit_progress(self, progress: DecodeProgress) -> None:
- """Emit progress update to callback."""
- if self._callback:
- try:
- self._callback(progress)
- except Exception as e:
- logger.error(f"Error in progress callback: {e}")
-
- def decode_file(self, audio_path: str | Path) -> list[SSTVImage]:
- """
- Decode SSTV image from audio file.
-
- Args:
- audio_path: Path to WAV audio file
-
- Returns:
- List of decoded images
- """
- audio_path = Path(audio_path)
- if not audio_path.exists():
- raise FileNotFoundError(f"Audio file not found: {audio_path}")
-
- images = []
-
- decoder = self._decoder or self._file_decoder
-
- if decoder == 'slowrx':
- # Use slowrx with file input
- output_file = self._output_dir / f"sstv_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
-
- cmd = ['slowrx', '-o', str(self._output_dir), str(audio_path)]
- result = subprocess.run(cmd, capture_output=True, timeout=300)
-
- if result.returncode == 0:
- # Check for new images
- for filepath in self._output_dir.glob('*.png'):
- stat = filepath.stat()
- if stat.st_mtime > time.time() - 60: # Created in last minute
- image = SSTVImage(
- filename=filepath.name,
- path=filepath,
- mode='Unknown',
- timestamp=datetime.now(timezone.utc),
- frequency=0,
- size_bytes=stat.st_size
- )
- images.append(image)
-
- elif decoder == 'python-sstv':
- # Use Python sstv library
- try:
- from sstv.decode import SSTVDecoder as PythonSSTVDecoder
- from PIL import Image
-
- decoder = PythonSSTVDecoder(str(audio_path))
- img = decoder.decode()
-
- if img:
- output_file = self._output_dir / f"sstv_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
- img.save(output_file)
-
- image = SSTVImage(
- filename=output_file.name,
- path=output_file,
- mode=decoder.mode or 'Unknown',
- timestamp=datetime.now(timezone.utc),
- frequency=0,
- size_bytes=output_file.stat().st_size
- )
- images.append(image)
-
- except ImportError:
- logger.error("Python sstv package not properly installed")
- except Exception as e:
- logger.error(f"Error decoding with Python sstv: {e}")
-
- return images
-
-
-# Global decoder instance
-_decoder: SSTVDecoder | None = None
-
-
-def get_sstv_decoder() -> SSTVDecoder:
- """Get or create the global SSTV decoder instance."""
- global _decoder
- if _decoder is None:
- _decoder = SSTVDecoder()
- return _decoder
-
-
-def is_sstv_available() -> bool:
- """Check if SSTV decoding is available."""
- decoder = get_sstv_decoder()
- return decoder.decoder_available is not None
-
-
-# Global general SSTV decoder instance (separate from ISS)
-_general_decoder: SSTVDecoder | None = None
-
-
-def get_general_sstv_decoder() -> SSTVDecoder:
- """Get or create the global general SSTV decoder instance."""
- global _general_decoder
- if _general_decoder is None:
- _general_decoder = SSTVDecoder(
- output_dir='instance/sstv_general_images',
- url_prefix='/sstv-general',
- )
- return _general_decoder
+"""SSTV decoder orchestrator.
+
+Provides the SSTVDecoder class that manages the full pipeline:
+rtl_fm subprocess -> audio stream -> VIS detection -> image decoding -> PNG output.
+
+Also contains DopplerTracker and supporting dataclasses migrated from the
+original monolithic utils/sstv.py.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import subprocess
+import threading
+import time
+from dataclasses import dataclass
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from typing import Callable
+
+import numpy as np
+
+from utils.logging import get_logger
+
+from .constants import ISS_SSTV_FREQ, SAMPLE_RATE, SPEED_OF_LIGHT
+from .dsp import normalize_audio
+from .image_decoder import SSTVImageDecoder
+from .modes import get_mode
+from .vis import VISDetector
+
+logger = get_logger('intercept.sstv')
+
+try:
+ from PIL import Image as PILImage
+except ImportError:
+ PILImage = None # type: ignore[assignment,misc]
+
+
+# ---------------------------------------------------------------------------
+# Dataclasses
+# ---------------------------------------------------------------------------
+
+@dataclass
+class DopplerInfo:
+ """Doppler shift information."""
+ frequency_hz: float
+ shift_hz: float
+ range_rate_km_s: float
+ elevation: float
+ azimuth: float
+ timestamp: datetime
+
+ def to_dict(self) -> dict:
+ return {
+ 'frequency_hz': self.frequency_hz,
+ 'shift_hz': round(self.shift_hz, 1),
+ 'range_rate_km_s': round(self.range_rate_km_s, 3),
+ 'elevation': round(self.elevation, 1),
+ 'azimuth': round(self.azimuth, 1),
+ 'timestamp': self.timestamp.isoformat(),
+ }
+
+
+@dataclass
+class SSTVImage:
+ """Decoded SSTV image."""
+ filename: str
+ path: Path
+ mode: str
+ timestamp: datetime
+ frequency: float
+ size_bytes: int = 0
+ url_prefix: str = '/sstv'
+
+ def to_dict(self) -> dict:
+ return {
+ 'filename': self.filename,
+ 'path': str(self.path),
+ 'mode': self.mode,
+ 'timestamp': self.timestamp.isoformat(),
+ 'frequency': self.frequency,
+ 'size_bytes': self.size_bytes,
+ 'url': f'{self.url_prefix}/images/{self.filename}'
+ }
+
+
+@dataclass
+class DecodeProgress:
+ """SSTV decode progress update."""
+ status: str # 'detecting', 'decoding', 'complete', 'error'
+ mode: str | None = None
+ progress_percent: int = 0
+ message: str | None = None
+ image: SSTVImage | None = None
+
+ def to_dict(self) -> dict:
+ result: dict = {
+ 'type': 'sstv_progress',
+ 'status': self.status,
+ 'progress': self.progress_percent,
+ }
+ if self.mode:
+ result['mode'] = self.mode
+ if self.message:
+ result['message'] = self.message
+ if self.image:
+ result['image'] = self.image.to_dict()
+ return result
+
+
+# ---------------------------------------------------------------------------
+# DopplerTracker
+# ---------------------------------------------------------------------------
+
+class DopplerTracker:
+ """Real-time Doppler shift calculator for satellite tracking.
+
+ Uses skyfield to calculate the range rate between observer and satellite,
+ then computes the Doppler-shifted receive frequency.
+ """
+
+ def __init__(self, satellite_name: str = 'ISS'):
+ self._satellite_name = satellite_name
+ self._observer_lat: float | None = None
+ self._observer_lon: float | None = None
+ self._satellite = None
+ self._observer = None
+ self._ts = None
+ self._enabled = False
+
+ def configure(self, latitude: float, longitude: float) -> bool:
+ """Configure the Doppler tracker with observer location."""
+ try:
+ from skyfield.api import EarthSatellite, load, wgs84
+
+ from data.satellites import TLE_SATELLITES
+
+ tle_data = TLE_SATELLITES.get(self._satellite_name)
+ if not tle_data:
+ logger.error(f"No TLE data for satellite: {self._satellite_name}")
+ return False
+
+ self._ts = load.timescale()
+ self._satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], self._ts)
+ self._observer = wgs84.latlon(latitude, longitude)
+ self._observer_lat = latitude
+ self._observer_lon = longitude
+ self._enabled = True
+
+ logger.info(f"Doppler tracker configured for {self._satellite_name} at ({latitude}, {longitude})")
+ return True
+
+ except ImportError:
+ logger.warning("skyfield not available - Doppler tracking disabled")
+ return False
+ except Exception as e:
+ logger.error(f"Failed to configure Doppler tracker: {e}")
+ return False
+
+ @property
+ def is_enabled(self) -> bool:
+ return self._enabled
+
+ def calculate(self, nominal_freq_mhz: float) -> DopplerInfo | None:
+ """Calculate current Doppler-shifted frequency."""
+ if not self._enabled or not self._satellite or not self._observer:
+ return None
+
+ try:
+ t = self._ts.now()
+ difference = self._satellite - self._observer
+ topocentric = difference.at(t)
+ alt, az, distance = topocentric.altaz()
+
+ dt_seconds = 1.0
+ t_future = self._ts.utc(t.utc_datetime() + timedelta(seconds=dt_seconds))
+ topocentric_future = difference.at(t_future)
+ _, _, distance_future = topocentric_future.altaz()
+
+ range_rate_km_s = (distance_future.km - distance.km) / dt_seconds
+ nominal_freq_hz = nominal_freq_mhz * 1_000_000
+ doppler_factor = 1 - (range_rate_km_s * 1000 / SPEED_OF_LIGHT)
+ corrected_freq_hz = nominal_freq_hz * doppler_factor
+ shift_hz = corrected_freq_hz - nominal_freq_hz
+
+ return DopplerInfo(
+ frequency_hz=corrected_freq_hz,
+ shift_hz=shift_hz,
+ range_rate_km_s=range_rate_km_s,
+ elevation=alt.degrees,
+ azimuth=az.degrees,
+ timestamp=datetime.now(timezone.utc)
+ )
+
+ except Exception as e:
+ logger.error(f"Doppler calculation failed: {e}")
+ return None
+
+
+# ---------------------------------------------------------------------------
+# SSTVDecoder
+# ---------------------------------------------------------------------------
+
+class SSTVDecoder:
+ """SSTV decoder using pure-Python DSP with Doppler compensation."""
+
+ RETUNE_THRESHOLD_HZ = 500
+ DOPPLER_UPDATE_INTERVAL = 5
+
+ def __init__(self, output_dir: str | Path | None = None, url_prefix: str = '/sstv'):
+ self._rtl_process = None
+ self._running = False
+ self._lock = threading.Lock()
+ self._callback: Callable[[DecodeProgress], None] | None = None
+ self._output_dir = Path(output_dir) if output_dir else Path('instance/sstv_images')
+ self._url_prefix = url_prefix
+ self._images: list[SSTVImage] = []
+ self._decode_thread = None
+ self._doppler_thread = None
+ self._frequency = ISS_SSTV_FREQ
+ self._modulation = 'fm'
+ self._current_tuned_freq_hz: int = 0
+ self._device_index = 0
+
+ # Doppler tracking
+ self._doppler_tracker = DopplerTracker('ISS')
+ self._doppler_enabled = False
+ self._last_doppler_info: DopplerInfo | None = None
+
+ # Ensure output directory exists
+ self._output_dir.mkdir(parents=True, exist_ok=True)
+
+ @property
+ def is_running(self) -> bool:
+ return self._running
+
+ @property
+ def decoder_available(self) -> str:
+ """Return name of available decoder. Always available with pure Python."""
+ return 'python-sstv'
+
+ def set_callback(self, callback: Callable[[DecodeProgress], None]) -> None:
+ """Set callback for decode progress updates."""
+ self._callback = callback
+
+ def start(
+ self,
+ frequency: float = ISS_SSTV_FREQ,
+ device_index: int = 0,
+ latitude: float | None = None,
+ longitude: float | None = None,
+ modulation: str = 'fm',
+ ) -> bool:
+ """Start SSTV decoder listening on specified frequency.
+
+ Args:
+ frequency: Frequency in MHz (default: 145.800 for ISS).
+ device_index: RTL-SDR device index.
+ latitude: Observer latitude for Doppler correction.
+ longitude: Observer longitude for Doppler correction.
+ modulation: Demodulation mode for rtl_fm (fm, usb, lsb).
+
+ Returns:
+ True if started successfully.
+ """
+ with self._lock:
+ if self._running:
+ return True
+
+ self._frequency = frequency
+ self._device_index = device_index
+ self._modulation = modulation
+
+ # Configure Doppler tracking if location provided
+ self._doppler_enabled = False
+ if latitude is not None and longitude is not None:
+ if self._doppler_tracker.configure(latitude, longitude):
+ self._doppler_enabled = True
+ logger.info(f"Doppler tracking enabled for location ({latitude}, {longitude})")
+ else:
+ logger.warning("Doppler tracking unavailable - using fixed frequency")
+
+ try:
+ freq_hz = self._get_doppler_corrected_freq_hz()
+ self._current_tuned_freq_hz = freq_hz
+ self._start_pipeline(freq_hz)
+ self._running = True
+
+ # Start Doppler tracking thread if enabled
+ if self._doppler_enabled:
+ self._doppler_thread = threading.Thread(
+ target=self._doppler_tracking_loop, daemon=True)
+ self._doppler_thread.start()
+ logger.info(f"SSTV decoder started on {frequency} MHz with Doppler tracking")
+ self._emit_progress(DecodeProgress(
+ status='detecting',
+ message=f'Listening on {frequency} MHz with Doppler tracking...'
+ ))
+ else:
+ logger.info(f"SSTV decoder started on {frequency} MHz (no Doppler tracking)")
+ self._emit_progress(DecodeProgress(
+ status='detecting',
+ message=f'Listening on {frequency} MHz...'
+ ))
+
+ return True
+
+ except Exception as e:
+ logger.error(f"Failed to start SSTV decoder: {e}")
+ self._emit_progress(DecodeProgress(
+ status='error',
+ message=str(e)
+ ))
+ return False
+
+ def _get_doppler_corrected_freq_hz(self) -> int:
+ """Get the Doppler-corrected frequency in Hz."""
+ nominal_freq_hz = int(self._frequency * 1_000_000)
+
+ if self._doppler_enabled:
+ doppler_info = self._doppler_tracker.calculate(self._frequency)
+ if doppler_info:
+ self._last_doppler_info = doppler_info
+ corrected_hz = int(doppler_info.frequency_hz)
+ logger.info(
+ f"Doppler correction: {doppler_info.shift_hz:+.1f} Hz "
+ f"(range rate: {doppler_info.range_rate_km_s:+.3f} km/s, "
+ f"el: {doppler_info.elevation:.1f}\u00b0)"
+ )
+ return corrected_hz
+
+ return nominal_freq_hz
+
+ def _start_pipeline(self, freq_hz: int) -> None:
+ """Start the rtl_fm -> Python decode pipeline."""
+ rtl_cmd = [
+ 'rtl_fm',
+ '-d', str(self._device_index),
+ '-f', str(freq_hz),
+ '-M', self._modulation,
+ '-s', str(SAMPLE_RATE),
+ '-r', str(SAMPLE_RATE),
+ '-l', '0', # No squelch
+ '-'
+ ]
+
+ logger.info(f"Starting rtl_fm: {' '.join(rtl_cmd)}")
+
+ self._rtl_process = subprocess.Popen(
+ rtl_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+
+ # Start decode thread that reads from rtl_fm stdout
+ self._decode_thread = threading.Thread(
+ target=self._decode_audio_stream, daemon=True)
+ self._decode_thread.start()
+
+ def _decode_audio_stream(self) -> None:
+ """Read audio from rtl_fm and decode SSTV images.
+
+ Runs in a background thread. Reads 100ms chunks of int16 PCM,
+ feeds through VIS detector, then image decoder.
+ """
+ chunk_bytes = SAMPLE_RATE // 10 * 2 # 100ms of int16 = 9600 bytes
+ vis_detector = VISDetector(sample_rate=SAMPLE_RATE)
+ image_decoder: SSTVImageDecoder | None = None
+ current_mode_name: str | None = None
+
+ logger.info("Audio decode thread started")
+
+ while self._running and self._rtl_process:
+ try:
+ raw_data = self._rtl_process.stdout.read(chunk_bytes)
+ if not raw_data:
+ if self._running:
+ logger.warning("rtl_fm stream ended unexpectedly")
+ break
+
+ # Convert int16 PCM to float64
+ n_samples = len(raw_data) // 2
+ if n_samples == 0:
+ continue
+ raw_samples = np.frombuffer(raw_data[:n_samples * 2], dtype=np.int16)
+ samples = normalize_audio(raw_samples)
+
+ if image_decoder is not None:
+ # Currently decoding an image
+ complete = image_decoder.feed(samples)
+
+ # Emit progress
+ self._emit_progress(DecodeProgress(
+ status='decoding',
+ mode=current_mode_name,
+ progress_percent=image_decoder.progress_percent,
+ message=f'Decoding {current_mode_name}: {image_decoder.progress_percent}%'
+ ))
+
+ if complete:
+ # Save image
+ self._save_decoded_image(image_decoder, current_mode_name)
+ image_decoder = None
+ current_mode_name = None
+ vis_detector.reset()
+ else:
+ # Scanning for VIS header
+ result = vis_detector.feed(samples)
+ if result is not None:
+ vis_code, mode_name = result
+ logger.info(f"VIS detected: code={vis_code}, mode={mode_name}")
+
+ mode_spec = get_mode(vis_code)
+ if mode_spec:
+ current_mode_name = mode_name
+ image_decoder = SSTVImageDecoder(
+ mode_spec,
+ sample_rate=SAMPLE_RATE,
+ )
+ self._emit_progress(DecodeProgress(
+ status='decoding',
+ mode=mode_name,
+ progress_percent=0,
+ message=f'Detected {mode_name} - decoding...'
+ ))
+ else:
+ logger.warning(f"No mode spec for VIS code {vis_code}")
+ vis_detector.reset()
+
+ except Exception as e:
+ logger.error(f"Error in decode thread: {e}")
+ if not self._running:
+ break
+ time.sleep(0.1)
+
+ logger.info("Audio decode thread stopped")
+
+ def _save_decoded_image(self, decoder: SSTVImageDecoder,
+ mode_name: str | None) -> None:
+ """Save a completed decoded image to disk."""
+ try:
+ img = decoder.get_image()
+ if img is None:
+ logger.error("Failed to get image from decoder (Pillow not available?)")
+ self._emit_progress(DecodeProgress(
+ status='error',
+ message='Failed to create image - Pillow not installed'
+ ))
+ return
+
+ timestamp = datetime.now(timezone.utc)
+ filename = f"sstv_{timestamp.strftime('%Y%m%d_%H%M%S')}_{mode_name or 'unknown'}.png"
+ filepath = self._output_dir / filename
+ img.save(filepath, 'PNG')
+
+ sstv_image = SSTVImage(
+ filename=filename,
+ path=filepath,
+ mode=mode_name or 'Unknown',
+ timestamp=timestamp,
+ frequency=self._frequency,
+ size_bytes=filepath.stat().st_size,
+ url_prefix=self._url_prefix,
+ )
+ self._images.append(sstv_image)
+
+ logger.info(f"SSTV image saved: {filename} ({sstv_image.size_bytes} bytes)")
+ self._emit_progress(DecodeProgress(
+ status='complete',
+ mode=mode_name,
+ progress_percent=100,
+ message='Image decoded',
+ image=sstv_image,
+ ))
+
+ except Exception as e:
+ logger.error(f"Error saving decoded image: {e}")
+ self._emit_progress(DecodeProgress(
+ status='error',
+ message=f'Error saving image: {e}'
+ ))
+
+ def _doppler_tracking_loop(self) -> None:
+ """Background thread that monitors Doppler shift and retunes when needed."""
+ logger.info("Doppler tracking thread started")
+
+ while self._running and self._doppler_enabled:
+ time.sleep(self.DOPPLER_UPDATE_INTERVAL)
+
+ if not self._running:
+ break
+
+ try:
+ doppler_info = self._doppler_tracker.calculate(self._frequency)
+ if not doppler_info:
+ continue
+
+ self._last_doppler_info = doppler_info
+ new_freq_hz = int(doppler_info.frequency_hz)
+ freq_diff = abs(new_freq_hz - self._current_tuned_freq_hz)
+
+ logger.debug(
+ f"Doppler: {doppler_info.shift_hz:+.1f} Hz, "
+ f"el: {doppler_info.elevation:.1f}\u00b0, "
+ f"diff from tuned: {freq_diff} Hz"
+ )
+
+ self._emit_progress(DecodeProgress(
+ status='detecting',
+ message=f'Doppler: {doppler_info.shift_hz:+.0f} Hz, elevation: {doppler_info.elevation:.1f}\u00b0'
+ ))
+
+ if freq_diff >= self.RETUNE_THRESHOLD_HZ:
+ logger.info(
+ f"Retuning: {self._current_tuned_freq_hz} -> {new_freq_hz} Hz "
+ f"(Doppler shift: {doppler_info.shift_hz:+.1f} Hz)"
+ )
+ self._retune_rtl_fm(new_freq_hz)
+
+ except Exception as e:
+ logger.error(f"Doppler tracking error: {e}")
+
+ logger.info("Doppler tracking thread stopped")
+
+ def _retune_rtl_fm(self, new_freq_hz: int) -> None:
+ """Retune rtl_fm to a new frequency by restarting the process."""
+ with self._lock:
+ if not self._running:
+ return
+
+ if self._rtl_process:
+ try:
+ self._rtl_process.terminate()
+ self._rtl_process.wait(timeout=2)
+ except Exception:
+ with contextlib.suppress(Exception):
+ self._rtl_process.kill()
+
+ rtl_cmd = [
+ 'rtl_fm',
+ '-d', str(self._device_index),
+ '-f', str(new_freq_hz),
+ '-M', self._modulation,
+ '-s', str(SAMPLE_RATE),
+ '-r', str(SAMPLE_RATE),
+ '-l', '0',
+ '-'
+ ]
+
+ logger.debug(f"Restarting rtl_fm: {' '.join(rtl_cmd)}")
+
+ self._rtl_process = subprocess.Popen(
+ rtl_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+
+ self._current_tuned_freq_hz = new_freq_hz
+
+ @property
+ def last_doppler_info(self) -> DopplerInfo | None:
+ """Get the most recent Doppler calculation."""
+ return self._last_doppler_info
+
+ @property
+ def doppler_enabled(self) -> bool:
+ """Check if Doppler tracking is enabled."""
+ return self._doppler_enabled
+
+ def stop(self) -> None:
+ """Stop SSTV decoder."""
+ with self._lock:
+ self._running = False
+
+ if self._rtl_process:
+ try:
+ self._rtl_process.terminate()
+ self._rtl_process.wait(timeout=5)
+ except Exception:
+ with contextlib.suppress(Exception):
+ self._rtl_process.kill()
+ self._rtl_process = None
+
+ logger.info("SSTV decoder stopped")
+
+ def get_images(self) -> list[SSTVImage]:
+ """Get list of decoded images."""
+ self._scan_images()
+ return list(self._images)
+
+ def _scan_images(self) -> None:
+ """Scan output directory for images."""
+ known_filenames = {img.filename for img in self._images}
+
+ for filepath in self._output_dir.glob('*.png'):
+ if filepath.name not in known_filenames:
+ try:
+ stat = filepath.stat()
+ image = SSTVImage(
+ filename=filepath.name,
+ path=filepath,
+ mode='Unknown',
+ timestamp=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
+ frequency=self._frequency,
+ size_bytes=stat.st_size,
+ url_prefix=self._url_prefix,
+ )
+ self._images.append(image)
+ except Exception as e:
+ logger.warning(f"Error scanning image {filepath}: {e}")
+
+ def _emit_progress(self, progress: DecodeProgress) -> None:
+ """Emit progress update to callback."""
+ if self._callback:
+ try:
+ self._callback(progress)
+ except Exception as e:
+ logger.error(f"Error in progress callback: {e}")
+
+ def decode_file(self, audio_path: str | Path) -> list[SSTVImage]:
+ """Decode SSTV image(s) from an audio file.
+
+ Reads a WAV file and processes it through VIS detection + image
+ decoding using the pure Python pipeline.
+
+ Args:
+ audio_path: Path to WAV audio file.
+
+ Returns:
+ List of decoded images.
+ """
+ import wave
+
+ audio_path = Path(audio_path)
+ if not audio_path.exists():
+ raise FileNotFoundError(f"Audio file not found: {audio_path}")
+
+ images: list[SSTVImage] = []
+
+ try:
+ with wave.open(str(audio_path), 'rb') as wf:
+ n_channels = wf.getnchannels()
+ sample_width = wf.getsampwidth()
+ file_sample_rate = wf.getframerate()
+ n_frames = wf.getnframes()
+
+ logger.info(
+ f"Decoding WAV: {n_channels}ch, {sample_width*8}bit, "
+ f"{file_sample_rate}Hz, {n_frames} frames"
+ )
+
+ # Read all audio data
+ raw_data = wf.readframes(n_frames)
+
+ # Convert to float64 mono
+ if sample_width == 2:
+ audio = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) / 32768.0
+ elif sample_width == 1:
+ audio = np.frombuffer(raw_data, dtype=np.uint8).astype(np.float64) / 128.0 - 1.0
+ elif sample_width == 4:
+ audio = np.frombuffer(raw_data, dtype=np.int32).astype(np.float64) / 2147483648.0
+ else:
+ raise ValueError(f"Unsupported sample width: {sample_width}")
+
+ # If stereo, take left channel
+ if n_channels > 1:
+ audio = audio[::n_channels]
+
+ # Resample if needed
+ if file_sample_rate != SAMPLE_RATE:
+ audio = self._resample(audio, file_sample_rate, SAMPLE_RATE)
+
+ # Process through VIS detector + image decoder
+ vis_detector = VISDetector(sample_rate=SAMPLE_RATE)
+ image_decoder: SSTVImageDecoder | None = None
+ current_mode_name: str | None = None
+
+ chunk_size = SAMPLE_RATE // 10 # 100ms chunks
+ offset = 0
+
+ while offset < len(audio):
+ chunk = audio[offset:offset + chunk_size]
+ offset += chunk_size
+
+ if image_decoder is not None:
+ complete = image_decoder.feed(chunk)
+ if complete:
+ img = image_decoder.get_image()
+ if img is not None:
+ timestamp = datetime.now(timezone.utc)
+ filename = f"sstv_{timestamp.strftime('%Y%m%d_%H%M%S')}_{current_mode_name or 'unknown'}.png"
+ filepath = self._output_dir / filename
+ img.save(filepath, 'PNG')
+
+ sstv_image = SSTVImage(
+ filename=filename,
+ path=filepath,
+ mode=current_mode_name or 'Unknown',
+ timestamp=timestamp,
+ frequency=0,
+ size_bytes=filepath.stat().st_size,
+ url_prefix=self._url_prefix,
+ )
+ images.append(sstv_image)
+ self._images.append(sstv_image)
+ logger.info(f"Decoded image from file: {filename}")
+
+ image_decoder = None
+ current_mode_name = None
+ vis_detector.reset()
+ else:
+ result = vis_detector.feed(chunk)
+ if result is not None:
+ vis_code, mode_name = result
+ logger.info(f"VIS detected in file: code={vis_code}, mode={mode_name}")
+
+ mode_spec = get_mode(vis_code)
+ if mode_spec:
+ current_mode_name = mode_name
+ image_decoder = SSTVImageDecoder(
+ mode_spec,
+ sample_rate=SAMPLE_RATE,
+ )
+ else:
+ vis_detector.reset()
+
+ except wave.Error as e:
+ logger.error(f"Error reading WAV file: {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Error decoding audio file: {e}")
+ raise
+
+ return images
+
+ @staticmethod
+ def _resample(audio: np.ndarray, from_rate: int, to_rate: int) -> np.ndarray:
+ """Simple resampling using linear interpolation."""
+ if from_rate == to_rate:
+ return audio
+
+ ratio = to_rate / from_rate
+ new_length = int(len(audio) * ratio)
+ indices = np.linspace(0, len(audio) - 1, new_length)
+ return np.interp(indices, np.arange(len(audio)), audio)
+
+
+# ---------------------------------------------------------------------------
+# Module-level singletons
+# ---------------------------------------------------------------------------
+
+_decoder: SSTVDecoder | None = None
+
+
+def get_sstv_decoder() -> SSTVDecoder:
+ """Get or create the global SSTV decoder instance."""
+ global _decoder
+ if _decoder is None:
+ _decoder = SSTVDecoder()
+ return _decoder
+
+
+def is_sstv_available() -> bool:
+ """Check if SSTV decoding is available.
+
+ Always True with the pure-Python decoder (requires only numpy/Pillow).
+ """
+ return True
+
+
+_general_decoder: SSTVDecoder | None = None
+
+
+def get_general_sstv_decoder() -> SSTVDecoder:
+ """Get or create the global general SSTV decoder instance."""
+ global _general_decoder
+ if _general_decoder is None:
+ _general_decoder = SSTVDecoder(
+ output_dir='instance/sstv_general_images',
+ url_prefix='/sstv-general',
+ )
+ return _general_decoder
diff --git a/utils/sstv/vis.py b/utils/sstv/vis.py
new file mode 100644
index 0000000..b250647
--- /dev/null
+++ b/utils/sstv/vis.py
@@ -0,0 +1,318 @@
+"""VIS (Vertical Interval Signaling) header detection.
+
+State machine that processes audio samples to detect the VIS header
+that precedes every SSTV image transmission. The VIS header identifies
+the SSTV mode (Robot36, Martin1, etc.) via an 8-bit code with even parity.
+
+VIS header structure:
+ Leader tone (1900 Hz, ~300ms)
+ Break (1200 Hz, ~10ms)
+ Leader tone (1900 Hz, ~300ms)
+ Start bit (1200 Hz, 30ms)
+ 8 data bits (1100 Hz = 1, 1300 Hz = 0, 30ms each)
+ Parity bit (even parity, 30ms)
+ Stop bit (1200 Hz, 30ms)
+"""
+
+from __future__ import annotations
+
+import enum
+
+import numpy as np
+
+from .constants import (
+ FREQ_LEADER,
+ FREQ_SYNC,
+ FREQ_VIS_BIT_0,
+ FREQ_VIS_BIT_1,
+ SAMPLE_RATE,
+ VIS_BIT_DURATION,
+ VIS_CODES,
+ VIS_LEADER_MAX,
+ VIS_LEADER_MIN,
+)
+from .dsp import goertzel, samples_for_duration
+
+# Use 10ms window (480 samples at 48kHz) for 100Hz frequency resolution.
+# This cleanly separates 1100, 1200, 1300, 1500, 1900, 2300 Hz tones.
+VIS_WINDOW = 480
+
+
+class VISState(enum.Enum):
+ """States of the VIS detection state machine."""
+ IDLE = 'idle'
+ LEADER_1 = 'leader_1'
+ BREAK = 'break'
+ LEADER_2 = 'leader_2'
+ START_BIT = 'start_bit'
+ DATA_BITS = 'data_bits'
+ PARITY = 'parity'
+ STOP_BIT = 'stop_bit'
+ DETECTED = 'detected'
+
+
+# The four tone classes we need to distinguish in VIS detection.
+_VIS_FREQS = [FREQ_VIS_BIT_1, FREQ_SYNC, FREQ_VIS_BIT_0, FREQ_LEADER]
+# 1100, 1200, 1300, 1900 Hz
+
+
+def _classify_tone(samples: np.ndarray,
+ sample_rate: int = SAMPLE_RATE) -> float | None:
+ """Classify which VIS tone is present in the given samples.
+
+ Computes Goertzel energy at each of the four VIS frequencies and returns
+ the one with the highest energy, provided it dominates sufficiently.
+
+ Returns:
+ The detected frequency (1100, 1200, 1300, or 1900), or None.
+ """
+ if len(samples) < 16:
+ return None
+
+ energies = {f: goertzel(samples, f, sample_rate) for f in _VIS_FREQS}
+ best_freq = max(energies, key=energies.get) # type: ignore[arg-type]
+ best_energy = energies[best_freq]
+
+ if best_energy <= 0:
+ return None
+
+ # Require the best frequency to be at least 2x stronger than the
+ # next-strongest tone.
+ others = sorted(
+ [e for f, e in energies.items() if f != best_freq], reverse=True)
+ second_best = others[0] if others else 0.0
+
+ if second_best > 0 and best_energy / second_best < 2.0:
+ return None
+
+ return best_freq
+
+
+class VISDetector:
+ """VIS header detection state machine.
+
+ Feed audio samples via ``feed()`` and it returns the detected VIS code
+ (and mode name) when a valid header is found.
+
+ The state machine uses a simple approach:
+
+ - **Leader detection**: Count consecutive 1900 Hz windows until minimum
+ leader duration is met.
+ - **Break/start bit**: Count consecutive 1200 Hz windows. The break is
+ short; the start bit is one VIS bit duration.
+ - **Data/parity bits**: Accumulate audio for one bit duration, then
+ compare 1100 vs 1300 Hz energy to determine bit value.
+ - **Stop bit**: Count 1200 Hz windows for one bit duration.
+
+ Usage::
+
+ detector = VISDetector()
+ for chunk in audio_chunks:
+ result = detector.feed(chunk)
+ if result is not None:
+ vis_code, mode_name = result
+ """
+
+ def __init__(self, sample_rate: int = SAMPLE_RATE):
+ self._sample_rate = sample_rate
+ self._window = VIS_WINDOW
+ self._bit_samples = samples_for_duration(VIS_BIT_DURATION, sample_rate)
+ self._leader_min_samples = samples_for_duration(VIS_LEADER_MIN, sample_rate)
+ self._leader_max_samples = samples_for_duration(VIS_LEADER_MAX, sample_rate)
+
+ # Pre-calculate window counts
+ self._leader_min_windows = max(1, self._leader_min_samples // self._window)
+ self._leader_max_windows = max(1, self._leader_max_samples // self._window)
+ self._bit_windows = max(1, self._bit_samples // self._window)
+
+ self._state = VISState.IDLE
+ self._buffer = np.array([], dtype=np.float64)
+ self._tone_counter = 0
+ self._data_bits: list[int] = []
+ self._parity_bit: int = 0
+ self._bit_accumulator: list[np.ndarray] = []
+
+ def reset(self) -> None:
+ """Reset the detector to scan for a new VIS header."""
+ self._state = VISState.IDLE
+ self._buffer = np.array([], dtype=np.float64)
+ self._tone_counter = 0
+ self._data_bits = []
+ self._parity_bit = 0
+ self._bit_accumulator = []
+
+ @property
+ def state(self) -> VISState:
+ return self._state
+
+ def feed(self, samples: np.ndarray) -> tuple[int, str] | None:
+ """Feed audio samples and attempt VIS detection.
+
+ Args:
+ samples: Float64 audio samples (normalized to -1..1).
+
+ Returns:
+ (vis_code, mode_name) tuple when a valid VIS header is detected,
+ or None if still scanning.
+ """
+ self._buffer = np.concatenate([self._buffer, samples])
+
+ while len(self._buffer) >= self._window:
+ result = self._process_window(self._buffer[:self._window])
+ self._buffer = self._buffer[self._window:]
+
+ if result is not None:
+ return result
+
+ return None
+
+ def _process_window(self, window: np.ndarray) -> tuple[int, str] | None:
+ """Process a single analysis window through the state machine.
+
+ The key design: when a state transition occurs due to a tone change,
+ the window that triggers the transition counts as the first window
+ of the new state (tone_counter = 1).
+ """
+ tone = _classify_tone(window, self._sample_rate)
+
+ if self._state == VISState.IDLE:
+ if tone == FREQ_LEADER:
+ self._tone_counter += 1
+ if self._tone_counter >= self._leader_min_windows:
+ self._state = VISState.LEADER_1
+ else:
+ self._tone_counter = 0
+
+ elif self._state == VISState.LEADER_1:
+ if tone == FREQ_LEADER:
+ self._tone_counter += 1
+ if self._tone_counter > self._leader_max_windows * 3:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+ elif tone == FREQ_SYNC:
+ # Transition to BREAK; this window counts as break window 1
+ self._tone_counter = 1
+ self._state = VISState.BREAK
+ else:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+
+ elif self._state == VISState.BREAK:
+ if tone == FREQ_SYNC:
+ self._tone_counter += 1
+ if self._tone_counter > 10:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+ elif tone == FREQ_LEADER:
+ # Transition to LEADER_2; this window counts
+ self._tone_counter = 1
+ self._state = VISState.LEADER_2
+ else:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+
+ elif self._state == VISState.LEADER_2:
+ if tone == FREQ_LEADER:
+ self._tone_counter += 1
+ if self._tone_counter > self._leader_max_windows * 3:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+ elif tone == FREQ_SYNC:
+ # Transition to START_BIT; this window counts
+ self._tone_counter = 1
+ self._state = VISState.START_BIT
+ # Check if start bit is already complete (1-window bit)
+ if self._tone_counter >= self._bit_windows:
+ self._tone_counter = 0
+ self._data_bits = []
+ self._bit_accumulator = []
+ self._state = VISState.DATA_BITS
+ else:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+
+ elif self._state == VISState.START_BIT:
+ if tone == FREQ_SYNC:
+ self._tone_counter += 1
+ if self._tone_counter >= self._bit_windows:
+ self._tone_counter = 0
+ self._data_bits = []
+ self._bit_accumulator = []
+ self._state = VISState.DATA_BITS
+ else:
+ # Non-sync during start bit: check if we had enough sync
+ # windows already (tolerant: accept if within 1 window)
+ if self._tone_counter >= self._bit_windows - 1:
+ # Close enough - accept and process this window as data
+ self._data_bits = []
+ self._bit_accumulator = [window]
+ self._tone_counter = 1
+ self._state = VISState.DATA_BITS
+ else:
+ self._tone_counter = 0
+ self._state = VISState.IDLE
+
+ elif self._state == VISState.DATA_BITS:
+ self._tone_counter += 1
+ self._bit_accumulator.append(window)
+
+ if self._tone_counter >= self._bit_windows:
+ bit_audio = np.concatenate(self._bit_accumulator)
+ bit_val = self._decode_bit(bit_audio)
+ self._data_bits.append(bit_val)
+ self._tone_counter = 0
+ self._bit_accumulator = []
+
+ if len(self._data_bits) == 8:
+ self._state = VISState.PARITY
+
+ elif self._state == VISState.PARITY:
+ self._tone_counter += 1
+ self._bit_accumulator.append(window)
+
+ if self._tone_counter >= self._bit_windows:
+ bit_audio = np.concatenate(self._bit_accumulator)
+ self._parity_bit = self._decode_bit(bit_audio)
+ self._tone_counter = 0
+ self._bit_accumulator = []
+ self._state = VISState.STOP_BIT
+
+ elif self._state == VISState.STOP_BIT:
+ self._tone_counter += 1
+
+ if self._tone_counter >= self._bit_windows:
+ result = self._validate_and_decode()
+ self.reset()
+ return result
+
+ return None
+
+ def _decode_bit(self, samples: np.ndarray) -> int:
+ """Decode a single VIS data bit from its audio samples.
+
+ Compares Goertzel energy at 1100 Hz (bit=1) vs 1300 Hz (bit=0).
+ """
+ e1 = goertzel(samples, FREQ_VIS_BIT_1, self._sample_rate)
+ e0 = goertzel(samples, FREQ_VIS_BIT_0, self._sample_rate)
+ return 1 if e1 > e0 else 0
+
+ def _validate_and_decode(self) -> tuple[int, str] | None:
+ """Validate parity and decode the VIS code.
+
+ Returns:
+ (vis_code, mode_name) or None if validation fails.
+ """
+ if len(self._data_bits) != 8:
+ return None
+
+ # Decode VIS code (LSB first)
+ vis_code = 0
+ for i, bit in enumerate(self._data_bits):
+ vis_code |= bit << i
+
+ # Look up mode
+ mode_name = VIS_CODES.get(vis_code)
+ if mode_name is not None:
+ return vis_code, mode_name
+
+ return None