mirror of
https://github.com/smittix/intercept.git
synced 2026-06-17 09:59:47 -07:00
Add WeFax (Weather Fax) decoder mode
Implement HF radiofax decoding with custom Python DSP pipeline (rtl_fm USB → Goertzel/Hilbert demodulation), 33-station database with broadcast schedules, audio waveform scope, live image preview, and decoded image gallery. Amber/gold UI theme for HF distinction. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+754
@@ -0,0 +1,754 @@
|
||||
"""WeFax (Weather Fax) decoder.
|
||||
|
||||
Decodes HF radiofax (weather fax) transmissions using RTL-SDR direct
|
||||
sampling mode. The decoder implements the standard WeFax AM protocol:
|
||||
carrier 1900 Hz, deviation +/-400 Hz (black=1500, white=2300).
|
||||
|
||||
Pipeline: rtl_fm -M usb -E direct2 -> stdout PCM -> Python DSP state machine
|
||||
|
||||
State machine: SCANNING -> PHASING -> RECEIVING -> COMPLETE
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import contextlib
|
||||
import io
|
||||
import math
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
import numpy as np
|
||||
|
||||
from utils.logging import get_logger
|
||||
|
||||
logger = get_logger('intercept.wefax')
|
||||
|
||||
try:
|
||||
from PIL import Image as PILImage
|
||||
except ImportError:
|
||||
PILImage = None # type: ignore[assignment,misc]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WeFax protocol constants
|
||||
# ---------------------------------------------------------------------------
|
||||
CARRIER_FREQ = 1900.0 # Hz - center/carrier
|
||||
BLACK_FREQ = 1500.0 # Hz - black level
|
||||
WHITE_FREQ = 2300.0 # Hz - white level
|
||||
START_TONE_FREQ = 300.0 # Hz - start tone
|
||||
STOP_TONE_FREQ = 450.0 # Hz - stop tone
|
||||
PHASING_FREQ = WHITE_FREQ # White pulse during phasing
|
||||
|
||||
START_TONE_DURATION = 3.0 # Minimum seconds of start tone to detect
|
||||
STOP_TONE_DURATION = 3.0 # Minimum seconds of stop tone to detect
|
||||
PHASING_MIN_LINES = 5 # Minimum phasing lines before image
|
||||
|
||||
DEFAULT_SAMPLE_RATE = 22050
|
||||
DEFAULT_IOC = 576
|
||||
DEFAULT_LPM = 120
|
||||
|
||||
|
||||
class DecoderState(Enum):
|
||||
"""WeFax decoder state machine states."""
|
||||
SCANNING = 'scanning'
|
||||
START_DETECTED = 'start_detected'
|
||||
PHASING = 'phasing'
|
||||
RECEIVING = 'receiving'
|
||||
COMPLETE = 'complete'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dataclasses
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class WeFaxImage:
|
||||
"""Decoded WeFax image metadata."""
|
||||
filename: str
|
||||
path: Path
|
||||
station: str
|
||||
frequency_khz: float
|
||||
timestamp: datetime
|
||||
ioc: int
|
||||
lpm: int
|
||||
size_bytes: int = 0
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'filename': self.filename,
|
||||
'path': str(self.path),
|
||||
'station': self.station,
|
||||
'frequency_khz': self.frequency_khz,
|
||||
'timestamp': self.timestamp.isoformat(),
|
||||
'ioc': self.ioc,
|
||||
'lpm': self.lpm,
|
||||
'size_bytes': self.size_bytes,
|
||||
'url': f'/wefax/images/{self.filename}',
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class WeFaxProgress:
|
||||
"""WeFax decode progress update for SSE streaming."""
|
||||
status: str # 'scanning', 'phasing', 'receiving', 'complete', 'error', 'stopped'
|
||||
station: str = ''
|
||||
message: str = ''
|
||||
progress_percent: int = 0
|
||||
line_count: int = 0
|
||||
image: WeFaxImage | None = None
|
||||
partial_image: str | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
result: dict = {
|
||||
'type': 'wefax_progress',
|
||||
'status': self.status,
|
||||
'progress': self.progress_percent,
|
||||
}
|
||||
if self.station:
|
||||
result['station'] = self.station
|
||||
if self.message:
|
||||
result['message'] = self.message
|
||||
if self.line_count:
|
||||
result['line_count'] = self.line_count
|
||||
if self.image:
|
||||
result['image'] = self.image.to_dict()
|
||||
if self.partial_image:
|
||||
result['partial_image'] = self.partial_image
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DSP helpers (reuse Goertzel from SSTV where sensible)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _goertzel_mag(samples: np.ndarray, target_freq: float,
|
||||
sample_rate: int) -> float:
|
||||
"""Compute Goertzel magnitude at a single frequency."""
|
||||
n = len(samples)
|
||||
if n == 0:
|
||||
return 0.0
|
||||
w = 2.0 * math.pi * target_freq / sample_rate
|
||||
coeff = 2.0 * math.cos(w)
|
||||
s1 = 0.0
|
||||
s2 = 0.0
|
||||
for sample in samples:
|
||||
s0 = float(sample) + coeff * s1 - s2
|
||||
s2 = s1
|
||||
s1 = s0
|
||||
energy = s1 * s1 + s2 * s2 - coeff * s1 * s2
|
||||
return math.sqrt(max(0.0, energy))
|
||||
|
||||
|
||||
def _freq_to_pixel(frequency: float) -> int:
|
||||
"""Map WeFax audio frequency to pixel value (0=black, 255=white).
|
||||
|
||||
Linear mapping: 1500 Hz -> 0 (black), 2300 Hz -> 255 (white).
|
||||
"""
|
||||
normalized = (frequency - BLACK_FREQ) / (WHITE_FREQ - BLACK_FREQ)
|
||||
return max(0, min(255, int(normalized * 255 + 0.5)))
|
||||
|
||||
|
||||
def _estimate_frequency(samples: np.ndarray, sample_rate: int,
|
||||
freq_low: float = 1200.0,
|
||||
freq_high: float = 2500.0) -> float:
|
||||
"""Estimate dominant frequency using coarse+fine Goertzel sweep."""
|
||||
if len(samples) == 0:
|
||||
return 0.0
|
||||
|
||||
best_freq = freq_low
|
||||
best_energy = 0.0
|
||||
|
||||
# Coarse sweep (25 Hz steps)
|
||||
freq = freq_low
|
||||
while freq <= freq_high:
|
||||
energy = _goertzel_mag(samples, freq, sample_rate) ** 2
|
||||
if energy > best_energy:
|
||||
best_energy = energy
|
||||
best_freq = freq
|
||||
freq += 25.0
|
||||
|
||||
# Fine sweep around peak (+/- 25 Hz, 5 Hz steps)
|
||||
fine_low = max(freq_low, best_freq - 25.0)
|
||||
fine_high = min(freq_high, best_freq + 25.0)
|
||||
freq = fine_low
|
||||
while freq <= fine_high:
|
||||
energy = _goertzel_mag(samples, freq, sample_rate) ** 2
|
||||
if energy > best_energy:
|
||||
best_energy = energy
|
||||
best_freq = freq
|
||||
freq += 5.0
|
||||
|
||||
return best_freq
|
||||
|
||||
|
||||
def _detect_tone(samples: np.ndarray, target_freq: float,
|
||||
sample_rate: int, threshold: float = 3.0) -> bool:
|
||||
"""Detect if a specific tone dominates the signal."""
|
||||
target_mag = _goertzel_mag(samples, target_freq, sample_rate)
|
||||
# Check against a few reference frequencies
|
||||
refs = [1000.0, 1500.0, 1900.0, 2300.0]
|
||||
refs = [f for f in refs if abs(f - target_freq) > 100]
|
||||
if not refs:
|
||||
return target_mag > 0.01
|
||||
avg_ref = sum(_goertzel_mag(samples, f, sample_rate) for f in refs) / len(refs)
|
||||
if avg_ref <= 0:
|
||||
return target_mag > 0.01
|
||||
return target_mag / avg_ref >= threshold
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WeFaxDecoder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class WeFaxDecoder:
|
||||
"""WeFax decoder singleton.
|
||||
|
||||
Manages rtl_fm subprocess and decodes WeFax images using a state
|
||||
machine that detects start/stop tones, phasing signals, and
|
||||
demodulates image lines.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._rtl_process: subprocess.Popen | None = None
|
||||
self._running = False
|
||||
self._lock = threading.Lock()
|
||||
self._callback: Callable[[dict], None] | None = None
|
||||
self._last_scope_time: float = 0.0
|
||||
self._output_dir = Path('instance/wefax_images')
|
||||
self._images: list[WeFaxImage] = []
|
||||
self._decode_thread: threading.Thread | None = None
|
||||
|
||||
# Current session parameters
|
||||
self._station = ''
|
||||
self._frequency_khz = 0.0
|
||||
self._ioc = DEFAULT_IOC
|
||||
self._lpm = DEFAULT_LPM
|
||||
self._sample_rate = DEFAULT_SAMPLE_RATE
|
||||
self._device_index = 0
|
||||
self._gain = 40.0
|
||||
self._direct_sampling = True
|
||||
|
||||
self._output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
def set_callback(self, callback: Callable[[dict], None]) -> None:
|
||||
"""Set callback for progress updates (fed to SSE queue)."""
|
||||
self._callback = callback
|
||||
|
||||
def start(
|
||||
self,
|
||||
frequency_khz: float,
|
||||
station: str = '',
|
||||
device_index: int = 0,
|
||||
gain: float = 40.0,
|
||||
ioc: int = DEFAULT_IOC,
|
||||
lpm: int = DEFAULT_LPM,
|
||||
direct_sampling: bool = True,
|
||||
) -> bool:
|
||||
"""Start WeFax decoder.
|
||||
|
||||
Args:
|
||||
frequency_khz: Frequency in kHz (e.g. 4298 for NOJ).
|
||||
station: Station callsign for metadata.
|
||||
device_index: RTL-SDR device index.
|
||||
gain: Receiver gain in dB.
|
||||
ioc: Index of Cooperation (576 or 288).
|
||||
lpm: Lines per minute (120 or 60).
|
||||
direct_sampling: Enable RTL-SDR direct sampling for HF.
|
||||
|
||||
Returns:
|
||||
True if started successfully.
|
||||
"""
|
||||
with self._lock:
|
||||
if self._running:
|
||||
return True
|
||||
|
||||
self._station = station
|
||||
self._frequency_khz = frequency_khz
|
||||
self._ioc = ioc
|
||||
self._lpm = lpm
|
||||
self._device_index = device_index
|
||||
self._gain = gain
|
||||
self._direct_sampling = direct_sampling
|
||||
self._sample_rate = DEFAULT_SAMPLE_RATE
|
||||
|
||||
try:
|
||||
self._running = True
|
||||
self._start_pipeline()
|
||||
|
||||
logger.info(
|
||||
f"WeFax decoder started: {frequency_khz} kHz, "
|
||||
f"station={station}, IOC={ioc}, LPM={lpm}"
|
||||
)
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='scanning',
|
||||
station=station,
|
||||
message=f'Scanning {frequency_khz} kHz for WeFax start tone...',
|
||||
))
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self._running = False
|
||||
logger.error(f"Failed to start WeFax decoder: {e}")
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='error',
|
||||
message=str(e),
|
||||
))
|
||||
return False
|
||||
|
||||
def _start_pipeline(self) -> None:
|
||||
"""Start rtl_fm subprocess in USB mode for WeFax."""
|
||||
freq_hz = int(self._frequency_khz * 1000)
|
||||
|
||||
rtl_cmd = [
|
||||
'rtl_fm',
|
||||
'-d', str(self._device_index),
|
||||
'-f', str(freq_hz),
|
||||
'-M', 'usb',
|
||||
'-s', str(self._sample_rate),
|
||||
'-r', str(self._sample_rate),
|
||||
'-g', str(self._gain),
|
||||
]
|
||||
|
||||
if self._direct_sampling:
|
||||
rtl_cmd.extend(['-E', 'direct2'])
|
||||
|
||||
rtl_cmd.append('-')
|
||||
|
||||
logger.info(f"Starting rtl_fm: {' '.join(rtl_cmd)}")
|
||||
|
||||
self._rtl_process = subprocess.Popen(
|
||||
rtl_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
self._decode_thread = threading.Thread(
|
||||
target=self._decode_audio_stream, daemon=True)
|
||||
self._decode_thread.start()
|
||||
|
||||
def _decode_audio_stream(self) -> None:
|
||||
"""Read audio from rtl_fm and decode WeFax images.
|
||||
|
||||
Runs in a background thread. Processes 100ms chunks through
|
||||
the start-tone / phasing / image state machine.
|
||||
"""
|
||||
sr = self._sample_rate
|
||||
chunk_samples = sr // 10 # 100ms
|
||||
chunk_bytes = chunk_samples * 2 # int16
|
||||
|
||||
state = DecoderState.SCANNING
|
||||
start_tone_count = 0
|
||||
stop_tone_count = 0
|
||||
phasing_line_count = 0
|
||||
|
||||
# Image parameters
|
||||
pixels_per_line = int(math.pi * self._ioc)
|
||||
line_duration_s = 60.0 / self._lpm
|
||||
samples_per_line = int(line_duration_s * sr)
|
||||
|
||||
# Image buffer
|
||||
image_lines: list[np.ndarray] = []
|
||||
line_buffer = np.zeros(0, dtype=np.float64)
|
||||
max_lines = 2000 # Safety limit
|
||||
|
||||
rtl_fm_error = ''
|
||||
last_partial_line = -1
|
||||
|
||||
logger.info(
|
||||
f"WeFax decode thread started: IOC={self._ioc}, "
|
||||
f"LPM={self._lpm}, pixels/line={pixels_per_line}, "
|
||||
f"samples/line={samples_per_line}"
|
||||
)
|
||||
|
||||
while self._running and self._rtl_process:
|
||||
try:
|
||||
raw_data = self._rtl_process.stdout.read(chunk_bytes)
|
||||
if not raw_data:
|
||||
if self._running:
|
||||
stderr_msg = ''
|
||||
if self._rtl_process and self._rtl_process.stderr:
|
||||
with contextlib.suppress(Exception):
|
||||
stderr_msg = self._rtl_process.stderr.read().decode(
|
||||
errors='replace').strip()
|
||||
rc = self._rtl_process.poll() if self._rtl_process else None
|
||||
logger.warning(f"rtl_fm stream ended (exit code: {rc})")
|
||||
if stderr_msg:
|
||||
logger.warning(f"rtl_fm stderr: {stderr_msg}")
|
||||
rtl_fm_error = stderr_msg
|
||||
break
|
||||
|
||||
n_samples = len(raw_data) // 2
|
||||
if n_samples == 0:
|
||||
continue
|
||||
|
||||
raw_int16 = np.frombuffer(raw_data[:n_samples * 2], dtype=np.int16)
|
||||
samples = raw_int16.astype(np.float64) / 32768.0
|
||||
|
||||
# Emit scope waveform for frontend visualisation
|
||||
self._emit_scope(raw_int16)
|
||||
|
||||
if state == DecoderState.SCANNING:
|
||||
# Look for 300 Hz start tone
|
||||
if _detect_tone(samples, START_TONE_FREQ, sr, threshold=2.5):
|
||||
start_tone_count += 1
|
||||
# Need sustained detection (>= START_TONE_DURATION seconds)
|
||||
needed = int(START_TONE_DURATION / 0.1)
|
||||
if start_tone_count >= needed:
|
||||
state = DecoderState.PHASING
|
||||
phasing_line_count = 0
|
||||
logger.info("WeFax start tone detected, entering phasing")
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='phasing',
|
||||
station=self._station,
|
||||
message='Start tone detected, synchronising...',
|
||||
))
|
||||
else:
|
||||
start_tone_count = max(0, start_tone_count - 1)
|
||||
|
||||
elif state == DecoderState.PHASING:
|
||||
# Count phasing lines (alternating black/white pulses)
|
||||
phasing_line_count += 1
|
||||
needed_phasing = max(PHASING_MIN_LINES, int(2.0 / 0.1))
|
||||
if phasing_line_count >= needed_phasing:
|
||||
state = DecoderState.RECEIVING
|
||||
image_lines = []
|
||||
line_buffer = np.zeros(0, dtype=np.float64)
|
||||
last_partial_line = -1
|
||||
logger.info("Phasing complete, receiving image")
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='receiving',
|
||||
station=self._station,
|
||||
message='Receiving image...',
|
||||
))
|
||||
|
||||
elif state == DecoderState.RECEIVING:
|
||||
# Check for stop tone
|
||||
if _detect_tone(samples, STOP_TONE_FREQ, sr, threshold=2.5):
|
||||
stop_tone_count += 1
|
||||
needed_stop = int(STOP_TONE_DURATION / 0.1)
|
||||
if stop_tone_count >= needed_stop:
|
||||
# Process any remaining line buffer
|
||||
if len(line_buffer) >= samples_per_line * 0.5:
|
||||
line_pixels = self._decode_line(
|
||||
line_buffer, pixels_per_line, sr)
|
||||
image_lines.append(line_pixels)
|
||||
|
||||
state = DecoderState.COMPLETE
|
||||
logger.info(
|
||||
f"Stop tone detected, image complete: "
|
||||
f"{len(image_lines)} lines"
|
||||
)
|
||||
break
|
||||
else:
|
||||
stop_tone_count = max(0, stop_tone_count - 1)
|
||||
|
||||
# Accumulate samples into line buffer
|
||||
line_buffer = np.concatenate([line_buffer, samples])
|
||||
|
||||
# Extract complete lines
|
||||
while len(line_buffer) >= samples_per_line:
|
||||
line_samples = line_buffer[:samples_per_line]
|
||||
line_buffer = line_buffer[samples_per_line:]
|
||||
|
||||
line_pixels = self._decode_line(
|
||||
line_samples, pixels_per_line, sr)
|
||||
image_lines.append(line_pixels)
|
||||
|
||||
# Safety limit
|
||||
if len(image_lines) >= max_lines:
|
||||
logger.warning("WeFax max lines reached, saving image")
|
||||
state = DecoderState.COMPLETE
|
||||
break
|
||||
|
||||
# Emit progress periodically
|
||||
current_lines = len(image_lines)
|
||||
if current_lines > 0 and current_lines != last_partial_line and current_lines % 20 == 0:
|
||||
last_partial_line = current_lines
|
||||
# Rough progress estimate (typical chart ~800 lines)
|
||||
pct = min(95, int(current_lines / 8))
|
||||
partial_url = self._encode_partial(
|
||||
image_lines, pixels_per_line)
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='receiving',
|
||||
station=self._station,
|
||||
message=f'Receiving: {current_lines} lines',
|
||||
progress_percent=pct,
|
||||
line_count=current_lines,
|
||||
partial_image=partial_url,
|
||||
))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in WeFax decode thread: {e}")
|
||||
if not self._running:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
# Save image if we got data
|
||||
if state == DecoderState.COMPLETE and image_lines:
|
||||
self._save_image(image_lines, pixels_per_line)
|
||||
elif state == DecoderState.RECEIVING and len(image_lines) > 20:
|
||||
# Save partial image if we had significant data
|
||||
logger.info(f"Saving partial WeFax image: {len(image_lines)} lines")
|
||||
self._save_image(image_lines, pixels_per_line)
|
||||
|
||||
# Clean up
|
||||
with self._lock:
|
||||
was_running = self._running
|
||||
self._running = False
|
||||
if self._rtl_process:
|
||||
with contextlib.suppress(Exception):
|
||||
self._rtl_process.terminate()
|
||||
self._rtl_process.wait(timeout=2)
|
||||
self._rtl_process = None
|
||||
|
||||
if was_running:
|
||||
err_detail = rtl_fm_error.split('\n')[-1] if rtl_fm_error else ''
|
||||
if state != DecoderState.COMPLETE:
|
||||
msg = f'rtl_fm failed: {err_detail}' if err_detail else 'Decode stopped unexpectedly'
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='error', message=msg))
|
||||
else:
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='stopped', message='Decoder stopped'))
|
||||
|
||||
logger.info("WeFax decode thread ended")
|
||||
|
||||
def _decode_line(self, line_samples: np.ndarray,
|
||||
pixels_per_line: int, sample_rate: int) -> np.ndarray:
|
||||
"""Decode one scan line from audio samples to pixel values.
|
||||
|
||||
Uses instantaneous frequency estimation via the analytic signal
|
||||
(Hilbert transform), then maps frequency to grayscale.
|
||||
"""
|
||||
n = len(line_samples)
|
||||
pixels = np.zeros(pixels_per_line, dtype=np.uint8)
|
||||
|
||||
if n < pixels_per_line:
|
||||
return pixels
|
||||
|
||||
samples_per_pixel = n / pixels_per_line
|
||||
|
||||
# Use Hilbert transform for instantaneous frequency
|
||||
try:
|
||||
analytic = np.fft.ifft(
|
||||
np.fft.fft(line_samples) * 2 * (np.arange(n) < n // 2))
|
||||
inst_phase = np.unwrap(np.angle(analytic))
|
||||
inst_freq = np.diff(inst_phase) / (2.0 * math.pi) * sample_rate
|
||||
inst_freq = np.clip(inst_freq, BLACK_FREQ - 200, WHITE_FREQ + 200)
|
||||
|
||||
# Average frequency per pixel
|
||||
for px in range(pixels_per_line):
|
||||
start_idx = int(px * samples_per_pixel)
|
||||
end_idx = int((px + 1) * samples_per_pixel)
|
||||
end_idx = min(end_idx, len(inst_freq))
|
||||
if start_idx >= end_idx:
|
||||
continue
|
||||
avg_freq = float(np.mean(inst_freq[start_idx:end_idx]))
|
||||
pixels[px] = _freq_to_pixel(avg_freq)
|
||||
|
||||
except Exception:
|
||||
# Fallback: simple Goertzel per pixel window
|
||||
for px in range(pixels_per_line):
|
||||
start_idx = int(px * samples_per_pixel)
|
||||
end_idx = int((px + 1) * samples_per_pixel)
|
||||
if start_idx >= len(line_samples) or start_idx >= end_idx:
|
||||
break
|
||||
window = line_samples[start_idx:end_idx]
|
||||
freq = _estimate_frequency(window, sample_rate,
|
||||
BLACK_FREQ - 200, WHITE_FREQ + 200)
|
||||
pixels[px] = _freq_to_pixel(freq)
|
||||
|
||||
return pixels
|
||||
|
||||
def _encode_partial(self, image_lines: list[np.ndarray],
|
||||
width: int) -> str | None:
|
||||
"""Encode current image lines as a JPEG data URL for live preview."""
|
||||
if PILImage is None or not image_lines:
|
||||
return None
|
||||
try:
|
||||
height = len(image_lines)
|
||||
img_array = np.zeros((height, width), dtype=np.uint8)
|
||||
for i, line in enumerate(image_lines):
|
||||
img_array[i, :len(line)] = line[:width]
|
||||
img = PILImage.fromarray(img_array, mode='L')
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, format='JPEG', quality=40)
|
||||
b64 = base64.b64encode(buf.getvalue()).decode('ascii')
|
||||
return f'data:image/jpeg;base64,{b64}'
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _save_image(self, image_lines: list[np.ndarray],
|
||||
width: int) -> None:
|
||||
"""Save completed image to disk."""
|
||||
if PILImage is None:
|
||||
logger.error("Cannot save image: Pillow not installed")
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='error',
|
||||
message='Cannot save image - Pillow not installed',
|
||||
))
|
||||
return
|
||||
|
||||
try:
|
||||
height = len(image_lines)
|
||||
img_array = np.zeros((height, width), dtype=np.uint8)
|
||||
for i, line in enumerate(image_lines):
|
||||
img_array[i, :len(line)] = line[:width]
|
||||
|
||||
img = PILImage.fromarray(img_array, mode='L')
|
||||
timestamp = datetime.now(timezone.utc)
|
||||
station_tag = self._station or 'unknown'
|
||||
filename = f"wefax_{timestamp.strftime('%Y%m%d_%H%M%S')}_{station_tag}.png"
|
||||
filepath = self._output_dir / filename
|
||||
img.save(filepath, 'PNG')
|
||||
|
||||
wefax_image = WeFaxImage(
|
||||
filename=filename,
|
||||
path=filepath,
|
||||
station=self._station,
|
||||
frequency_khz=self._frequency_khz,
|
||||
timestamp=timestamp,
|
||||
ioc=self._ioc,
|
||||
lpm=self._lpm,
|
||||
size_bytes=filepath.stat().st_size,
|
||||
)
|
||||
self._images.append(wefax_image)
|
||||
|
||||
logger.info(f"WeFax image saved: {filename} ({wefax_image.size_bytes} bytes)")
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='complete',
|
||||
station=self._station,
|
||||
message=f'Image decoded: {height} lines',
|
||||
progress_percent=100,
|
||||
line_count=height,
|
||||
image=wefax_image,
|
||||
))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving WeFax image: {e}")
|
||||
self._emit_progress(WeFaxProgress(
|
||||
status='error',
|
||||
message=f'Error saving image: {e}',
|
||||
))
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop WeFax decoder."""
|
||||
with self._lock:
|
||||
self._running = False
|
||||
|
||||
if self._rtl_process:
|
||||
try:
|
||||
self._rtl_process.terminate()
|
||||
self._rtl_process.wait(timeout=5)
|
||||
except Exception:
|
||||
with contextlib.suppress(Exception):
|
||||
self._rtl_process.kill()
|
||||
self._rtl_process = None
|
||||
|
||||
logger.info("WeFax decoder stopped")
|
||||
|
||||
def get_images(self) -> list[WeFaxImage]:
|
||||
"""Get list of decoded images."""
|
||||
self._scan_images()
|
||||
return list(self._images)
|
||||
|
||||
def delete_image(self, filename: str) -> bool:
|
||||
"""Delete a single decoded image."""
|
||||
filepath = self._output_dir / filename
|
||||
if not filepath.exists():
|
||||
return False
|
||||
filepath.unlink()
|
||||
self._images = [img for img in self._images if img.filename != filename]
|
||||
logger.info(f"Deleted WeFax image: {filename}")
|
||||
return True
|
||||
|
||||
def delete_all_images(self) -> int:
|
||||
"""Delete all decoded images. Returns count deleted."""
|
||||
count = 0
|
||||
for filepath in self._output_dir.glob('*.png'):
|
||||
filepath.unlink()
|
||||
count += 1
|
||||
self._images.clear()
|
||||
logger.info(f"Deleted all WeFax images ({count} files)")
|
||||
return count
|
||||
|
||||
def _scan_images(self) -> None:
|
||||
"""Scan output directory for images not yet tracked."""
|
||||
known = {img.filename for img in self._images}
|
||||
for filepath in self._output_dir.glob('*.png'):
|
||||
if filepath.name not in known:
|
||||
try:
|
||||
stat = filepath.stat()
|
||||
image = WeFaxImage(
|
||||
filename=filepath.name,
|
||||
path=filepath,
|
||||
station='',
|
||||
frequency_khz=0,
|
||||
timestamp=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
|
||||
ioc=self._ioc,
|
||||
lpm=self._lpm,
|
||||
size_bytes=stat.st_size,
|
||||
)
|
||||
self._images.append(image)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error scanning image {filepath}: {e}")
|
||||
|
||||
def _emit_progress(self, progress: WeFaxProgress) -> None:
|
||||
"""Emit progress update to callback."""
|
||||
if self._callback:
|
||||
try:
|
||||
self._callback(progress.to_dict())
|
||||
except Exception as e:
|
||||
logger.error(f"Error in progress callback: {e}")
|
||||
|
||||
def _emit_scope(self, raw_int16: np.ndarray) -> None:
|
||||
"""Emit scope waveform data for frontend visualisation."""
|
||||
if not self._callback:
|
||||
return
|
||||
|
||||
now = time.monotonic()
|
||||
if now - self._last_scope_time < 0.1:
|
||||
return
|
||||
self._last_scope_time = now
|
||||
|
||||
try:
|
||||
peak = int(np.max(np.abs(raw_int16)))
|
||||
rms = int(np.sqrt(np.mean(raw_int16.astype(np.float64) ** 2)))
|
||||
|
||||
# Downsample to 256 signed int8 values for lightweight transport
|
||||
window = raw_int16[-256:] if len(raw_int16) > 256 else raw_int16
|
||||
waveform = np.clip(window // 256, -127, 127).astype(np.int8).tolist()
|
||||
|
||||
self._callback({
|
||||
'type': 'scope',
|
||||
'rms': rms,
|
||||
'peak': peak,
|
||||
'waveform': waveform,
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module-level singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_decoder: WeFaxDecoder | None = None
|
||||
|
||||
|
||||
def get_wefax_decoder() -> WeFaxDecoder:
|
||||
"""Get or create the global WeFax decoder instance."""
|
||||
global _decoder
|
||||
if _decoder is None:
|
||||
_decoder = WeFaxDecoder()
|
||||
return _decoder
|
||||
@@ -0,0 +1,89 @@
|
||||
"""WeFax station database loader.
|
||||
|
||||
Loads and caches station data from data/wefax_stations.json. Provides
|
||||
lookup by callsign and current-broadcast filtering based on UTC time.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
_stations_cache: list[dict] | None = None
|
||||
_stations_by_callsign: dict[str, dict] = {}
|
||||
|
||||
_STATIONS_PATH = Path(__file__).resolve().parent.parent / 'data' / 'wefax_stations.json'
|
||||
|
||||
|
||||
def load_stations() -> list[dict]:
|
||||
"""Load all WeFax stations from JSON, caching on first call."""
|
||||
global _stations_cache, _stations_by_callsign
|
||||
|
||||
if _stations_cache is not None:
|
||||
return _stations_cache
|
||||
|
||||
with open(_STATIONS_PATH) as f:
|
||||
data = json.load(f)
|
||||
|
||||
_stations_cache = data.get('stations', [])
|
||||
_stations_by_callsign = {s['callsign']: s for s in _stations_cache}
|
||||
return _stations_cache
|
||||
|
||||
|
||||
def get_station(callsign: str) -> dict | None:
|
||||
"""Get a single station by callsign."""
|
||||
load_stations()
|
||||
return _stations_by_callsign.get(callsign.upper())
|
||||
|
||||
|
||||
def get_current_broadcasts(callsign: str) -> list[dict]:
|
||||
"""Return schedule entries closest to the current UTC time.
|
||||
|
||||
Returns up to 3 entries: the most recent past broadcast and the
|
||||
next two upcoming ones, annotated with ``minutes_until`` or
|
||||
``minutes_ago`` relative to now.
|
||||
"""
|
||||
station = get_station(callsign)
|
||||
if not station:
|
||||
return []
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
current_minutes = now.hour * 60 + now.minute
|
||||
|
||||
schedule = station.get('schedule', [])
|
||||
if not schedule:
|
||||
return []
|
||||
|
||||
# Convert schedule times to minutes-since-midnight for comparison
|
||||
entries: list[tuple[int, dict]] = []
|
||||
for entry in schedule:
|
||||
parts = entry['utc'].split(':')
|
||||
mins = int(parts[0]) * 60 + int(parts[1])
|
||||
entries.append((mins, entry))
|
||||
entries.sort(key=lambda x: x[0])
|
||||
|
||||
# Find closest entries relative to now
|
||||
results = []
|
||||
for mins, entry in entries:
|
||||
diff = mins - current_minutes
|
||||
# Wrap around midnight
|
||||
if diff < -720:
|
||||
diff += 1440
|
||||
elif diff > 720:
|
||||
diff -= 1440
|
||||
|
||||
annotated = dict(entry)
|
||||
if diff >= 0:
|
||||
annotated['minutes_until'] = diff
|
||||
else:
|
||||
annotated['minutes_ago'] = abs(diff)
|
||||
annotated['_sort_key'] = abs(diff)
|
||||
results.append(annotated)
|
||||
|
||||
results.sort(key=lambda x: x['_sort_key'])
|
||||
|
||||
# Return 3 nearest entries, clean up sort key
|
||||
for r in results:
|
||||
r.pop('_sort_key', None)
|
||||
return results[:3]
|
||||
Reference in New Issue
Block a user