Add real-time signal scope to both SSTV modes

Adds a phosphor-persistence waveform scope showing audio RMS/peak
levels during ISS SSTV and General SSTV decoding, matching the
existing pager scope pattern with a purple color scheme.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-08 00:28:33 +00:00
parent 92e5e7c6da
commit 766a51753d
6 changed files with 381 additions and 24 deletions

View File

@@ -225,7 +225,7 @@ class SSTVDecoder:
self._rtl_process = None
self._running = False
self._lock = threading.Lock()
self._callback: Callable[[DecodeProgress], None] | None = None
self._callback: Callable[[dict], None] | None = None
self._output_dir = Path(output_dir) if output_dir else Path('instance/sstv_images')
self._url_prefix = url_prefix
self._images: list[SSTVImage] = []
@@ -253,7 +253,7 @@ class SSTVDecoder:
"""Return name of available decoder. Always available with pure Python."""
return 'python-sstv'
def set_callback(self, callback: Callable[[DecodeProgress], None]) -> None:
def set_callback(self, callback: Callable[[dict], None]) -> None:
"""Set callback for decode progress updates."""
self._callback = callback
@@ -420,6 +420,10 @@ class SSTVDecoder:
chunk_counter += 1
# Scope: compute RMS/peak from raw int16 samples every chunk
rms_val = int(np.sqrt(np.mean(raw_samples.astype(np.float64) ** 2)))
peak_val = int(np.max(np.abs(raw_samples)))
if image_decoder is not None:
# Currently decoding an image
complete = image_decoder.feed(samples)
@@ -447,6 +451,7 @@ class SSTVDecoder:
message=f'Decoding {current_mode_name}: {pct}%',
partial_image=partial_url,
))
self._emit_scope(rms_val, peak_val, 'decoding')
if complete:
# Save image
@@ -479,6 +484,7 @@ class SSTVDecoder:
vis_detector.reset()
# Emit signal level metrics every ~500ms (every 5th 100ms chunk)
scope_tone: str | None = None
if chunk_counter % 5 == 0 and image_decoder is None:
rms = float(np.sqrt(np.mean(samples ** 2)))
signal_level = min(100, int(rms * 500))
@@ -501,6 +507,8 @@ class SSTVDecoder:
else:
sstv_tone = None
scope_tone = sstv_tone
self._emit_progress(DecodeProgress(
status='detecting',
message='Listening...',
@@ -509,6 +517,8 @@ class SSTVDecoder:
vis_state=vis_detector.state.value,
))
self._emit_scope(rms_val, peak_val, scope_tone)
except Exception as e:
logger.error(f"Error in decode thread: {e}")
if not self._running:
@@ -736,10 +746,18 @@ class SSTVDecoder:
"""Emit progress update to callback."""
if self._callback:
try:
self._callback(progress)
self._callback(progress.to_dict())
except Exception as e:
logger.error(f"Error in progress callback: {e}")
def _emit_scope(self, rms: int, peak: int, tone: str | None = None) -> None:
"""Emit scope signal levels to callback."""
if self._callback:
try:
self._callback({'type': 'sstv_scope', 'rms': rms, 'peak': peak, 'tone': tone})
except Exception:
pass
def decode_file(self, audio_path: str | Path) -> list[SSTVImage]:
"""Decode SSTV image(s) from an audio file.