Apply pending weather-sat and wefax updates

This commit is contained in:
Smittix
2026-02-24 21:46:58 +00:00
parent 53f54af871
commit 6c20b3d23f
10 changed files with 712 additions and 206 deletions

42
.claude/CLAUDE.md Normal file
View File

@@ -0,0 +1,42 @@
## Workflow Orchestration
### 1. Plan Node Default
- Enter plan mode for ANY non-trivial task (3+ steps or architectural decisions)
- If something goes sideways, STOP and re-plan immediately - don't keep pushing
- Use plan mode for verification steps, not just building
- Write detailed specs upfront to reduce ambiguity
### 2. Subagent Strategy
- Use subagents liberally to keep main context window clean
- Offload research, exploration, and parallel analysis to subagents
- For complex problems, throw more compute at it via subagents
- One tack per subagent for focused execution
### 3. Self-Improvement Loop
- After ANY correction from the user: update 'tasks/lessons.md" with the pattern
- Write rules for yourself that prevent the same mistake
- Ruthlessly iterate on these lessons until mistake rate drops
- Review lessons at session start for relevant project
### 4. Verification Before Done
- Never mark a task complete without proving it works
- Diff behavior between main and your changes when relevant
- Ask yourself: "Would a staff engineer approve this?"
- Run tests, check logs, demonstrate correctness
### 5. Demand Elegance (Balanced)
- For non-trivial changes: pause and ask "is there a more elegant way?"
- If a fix feels hacky: "Knowing everything I know now, implement the elegant solution"
- Skip this for simple, obvious fixes - don't over-engineer
-Challenge your own work before presenting it
### 6. Autonomous Bug Fizing
- When given a bug report: just fix it. Don't ask for hand-holding
- Point at logs, errors, failing tests - then resolve them
- Zero context switching required from the user
- Go fix failing CI tests without being told how
## Task Management
1. **Plan First**: Write plan to "tasks/todo.md" with checkable items
2. **Verify Plan**: Check in before starting implementation
3. **Track Progress**: Mark items complete as you go
4. **Explain Changes**: High-level summary at each step
5. **Document Results**: Add review section to 'tasks/todo.md"
6. **Capture Lessons**: Update 'tasks/lessons.md' after corrections
## Core Principles
- **Simplicity First**: Make every change as simple as possible. Impact minimal code.
- **No Laziness**: Find root causes. No temporary fixes. Senior developer standards.
- **Minimat Impact**: Changes should only touch what's necessary. Avoid introducing bugs.

View File

@@ -26,19 +26,48 @@ logger = get_logger('intercept.weather_sat')
weather_sat_bp = Blueprint('weather_sat', __name__, url_prefix='/weather-sat')
# Queue for SSE progress streaming
_weather_sat_queue: queue.Queue = queue.Queue(maxsize=100)
_weather_sat_queue: queue.Queue = queue.Queue(maxsize=100)
def _progress_callback(progress: CaptureProgress) -> None:
"""Callback to queue progress updates for SSE stream."""
try:
_weather_sat_queue.put_nowait(progress.to_dict())
def _progress_callback(progress: CaptureProgress) -> None:
"""Callback to queue progress updates for SSE stream."""
try:
_weather_sat_queue.put_nowait(progress.to_dict())
except queue.Full:
try:
_weather_sat_queue.get_nowait()
_weather_sat_queue.put_nowait(progress.to_dict())
except queue.Empty:
pass
except queue.Empty:
pass
def _release_weather_sat_device(device_index: int) -> None:
"""Release an SDR device only if weather-sat currently owns it."""
if device_index < 0:
return
try:
import app as app_module
except ImportError:
return
owner = None
get_status = getattr(app_module, 'get_sdr_device_status', None)
if callable(get_status):
try:
owner = get_status().get(device_index)
except Exception:
owner = None
if owner and owner != 'weather_sat':
logger.debug(
'Skipping SDR release for device %s owned by %s',
device_index,
owner,
)
return
app_module.release_sdr_device(device_index)
@weather_sat_bp.route('/status')
@@ -149,15 +178,11 @@ def start_capture():
except queue.Empty:
break
# Set callback and on-complete handler for SDR release
decoder.set_callback(_progress_callback)
def _release_device():
try:
import app as app_module
app_module.release_sdr_device(device_index)
except ImportError:
pass
# Set callback and on-complete handler for SDR release
decoder.set_callback(_progress_callback)
def _release_device():
_release_weather_sat_device(device_index)
decoder.set_on_complete(_release_device)
@@ -318,14 +343,9 @@ def stop_capture():
decoder = get_weather_sat_decoder()
device_index = decoder.device_index
decoder.stop()
# Release SDR device
try:
import app as app_module
app_module.release_sdr_device(device_index)
except ImportError:
pass
decoder.stop()
_release_weather_sat_device(device_index)
return jsonify({'status': 'stopped'})

View File

@@ -28,16 +28,29 @@ _wefax_queue: queue.Queue = queue.Queue(maxsize=100)
wefax_active_device: int | None = None
def _progress_callback(data: dict) -> None:
"""Callback to queue progress updates for SSE stream."""
try:
_wefax_queue.put_nowait(data)
except queue.Full:
try:
_wefax_queue.get_nowait()
_wefax_queue.put_nowait(data)
except queue.Empty:
pass
def _progress_callback(data: dict) -> None:
"""Callback to queue progress updates for SSE stream."""
global wefax_active_device
try:
_wefax_queue.put_nowait(data)
except queue.Full:
try:
_wefax_queue.get_nowait()
_wefax_queue.put_nowait(data)
except queue.Empty:
pass
# Ensure manually claimed SDR devices are always released when a
# decode session ends on its own (complete/error/stopped).
if (
isinstance(data, dict)
and data.get('type') == 'wefax_progress'
and data.get('status') in ('complete', 'error', 'stopped')
and wefax_active_device is not None
):
app_module.release_sdr_device(wefax_active_device)
wefax_active_device = None
@wefax_bp.route('/status')

View File

@@ -327,17 +327,25 @@ var WeFax = (function () {
if (idleEl) idleEl.style.display = 'none';
}
// Image complete
if (data.status === 'complete' && data.image) {
scopeImageBurst = 1.0;
loadImages();
setStatus('Image decoded: ' + (data.line_count || '?') + ' lines');
}
if (data.status === 'error') {
state.running = false;
updateButtons(false);
showStripError(data.message || 'Decode error');
// Image complete
if (data.status === 'complete' && data.image) {
scopeImageBurst = 1.0;
loadImages();
setStatus('Image decoded: ' + (data.line_count || '?') + ' lines');
}
if (data.status === 'complete') {
state.running = false;
updateButtons(false);
if (!state.schedulerEnabled) {
disconnectSSE();
}
}
if (data.status === 'error') {
state.running = false;
updateButtons(false);
showStripError(data.message || 'Decode error');
}
if (data.status === 'stopped') {

View File

@@ -0,0 +1,120 @@
"""Targeted regression tests for recent weather-satellite hardening fixes."""
from __future__ import annotations
import re
from unittest.mock import MagicMock, patch
import pytest
from utils.weather_sat import WeatherSatDecoder
@pytest.fixture
def authed_client(client):
"""Return a logged-in test client for authenticated weather-sat routes."""
with client.session_transaction() as session:
session['logged_in'] = True
return client
class TestWeatherSatRouteReleaseGuards:
"""Regression tests for safe SDR release behavior in weather-sat routes."""
def test_stop_does_not_release_device_owned_by_other_mode(self, authed_client):
"""POST /weather-sat/stop should not release a foreign-owned SDR device."""
mock_decoder = MagicMock()
mock_decoder.device_index = 2
with patch('routes.weather_sat.get_weather_sat_decoder', return_value=mock_decoder), \
patch('app.get_sdr_device_status', return_value={2: 'wifi'}), \
patch('app.release_sdr_device') as mock_release:
response = authed_client.post('/weather-sat/stop')
assert response.status_code == 200
assert response.get_json()['status'] == 'stopped'
mock_decoder.stop.assert_called_once()
mock_release.assert_not_called()
def test_stop_releases_device_owned_by_weather_sat(self, authed_client):
"""POST /weather-sat/stop should release SDR when weather-sat owns it."""
mock_decoder = MagicMock()
mock_decoder.device_index = 2
with patch('routes.weather_sat.get_weather_sat_decoder', return_value=mock_decoder), \
patch('app.get_sdr_device_status', return_value={2: 'weather_sat'}), \
patch('app.release_sdr_device') as mock_release:
response = authed_client.post('/weather-sat/stop')
assert response.status_code == 200
assert response.get_json()['status'] == 'stopped'
mock_decoder.stop.assert_called_once()
mock_release.assert_called_once_with(2)
def test_stop_skips_release_for_offline_decode_index(self, authed_client):
"""POST /weather-sat/stop should not release when decoder index is -1."""
mock_decoder = MagicMock()
mock_decoder.device_index = -1
with patch('routes.weather_sat.get_weather_sat_decoder', return_value=mock_decoder), \
patch('app.release_sdr_device') as mock_release:
response = authed_client.post('/weather-sat/stop')
assert response.status_code == 200
assert response.get_json()['status'] == 'stopped'
mock_decoder.stop.assert_called_once()
mock_release.assert_not_called()
class TestWeatherSatDecoderRegressions:
"""Regression tests for decoder filename and offline-device handling."""
def test_scan_output_dir_preserves_extension_and_sanitizes_filename(self, tmp_path):
"""Copied image names should stay safe and preserve JPG/JPEG extensions."""
output_dir = tmp_path / 'weather_sat_out'
capture_dir = tmp_path / 'capture'
capture_dir.mkdir(parents=True)
source_image = capture_dir / 'channel 3 (raw).jpeg'
source_image.write_bytes(b'\xff\xd8\xff' + b'\x00' * 2048)
with patch('shutil.which', return_value='/usr/bin/satdump'):
decoder = WeatherSatDecoder(output_dir=output_dir)
decoder._capture_output_dir = capture_dir
decoder._current_satellite = 'METEOR-M2-4'
decoder._current_mode = 'LRPT'
decoder._current_frequency = 137.9
decoder._scan_output_dir(set())
assert len(decoder._images) == 1
image = decoder._images[0]
assert image.filename.endswith('.jpeg')
assert re.fullmatch(r'[A-Za-z0-9_.-]+', image.filename)
assert (output_dir / image.filename).is_file()
def test_start_from_file_keeps_device_index_unclaimed(self, tmp_path):
"""Offline file decode should not claim or persist an SDR device index."""
with patch('shutil.which', return_value='/usr/bin/satdump'), \
patch('pathlib.Path.is_file', return_value=True), \
patch('pathlib.Path.resolve') as mock_resolve, \
patch.object(WeatherSatDecoder, '_start_satdump_offline') as mock_start:
resolved = MagicMock()
resolved.is_relative_to.return_value = True
mock_resolve.return_value = resolved
decoder = WeatherSatDecoder(output_dir=tmp_path / 'weather_sat_out')
success = decoder.start_from_file(
satellite='METEOR-M2-3',
input_file='data/weather_sat/samples/sample.wav',
sample_rate=1_000_000,
)
assert success is True
assert decoder.device_index == -1
mock_start.assert_called_once()
decoder.stop()
assert decoder.device_index == -1

View File

@@ -225,7 +225,7 @@ class TestWeFaxDecoder:
# Route tests
# ---------------------------------------------------------------------------
class TestWeFaxRoutes:
class TestWeFaxRoutes:
"""WeFax route endpoint tests."""
def test_status(self, client):
@@ -419,12 +419,54 @@ class TestWeFaxRoutes:
assert response.status_code == 400
def test_delete_image_wrong_extension(self, client):
"""DELETE /wefax/images/<filename> should reject non-PNG."""
_login_session(client)
mock_decoder = MagicMock()
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder):
response = client.delete('/wefax/images/test.jpg')
assert response.status_code == 400
def test_delete_image_wrong_extension(self, client):
"""DELETE /wefax/images/<filename> should reject non-PNG."""
_login_session(client)
mock_decoder = MagicMock()
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder):
response = client.delete('/wefax/images/test.jpg')
assert response.status_code == 400
class TestWeFaxProgressCallback:
"""Regression tests for WeFax route-level progress callback behavior."""
def test_terminal_progress_releases_active_device(self):
"""Terminal decoder events must release any manually claimed SDR."""
import routes.wefax as wefax_routes
original_device = wefax_routes.wefax_active_device
try:
wefax_routes.wefax_active_device = 3
with patch('routes.wefax.app_module.release_sdr_device') as mock_release:
wefax_routes._progress_callback({
'type': 'wefax_progress',
'status': 'error',
'message': 'decode failed',
})
mock_release.assert_called_once_with(3)
assert wefax_routes.wefax_active_device is None
finally:
wefax_routes.wefax_active_device = original_device
def test_non_terminal_progress_does_not_release_active_device(self):
"""Non-terminal progress updates must not release SDR ownership."""
import routes.wefax as wefax_routes
original_device = wefax_routes.wefax_active_device
try:
wefax_routes.wefax_active_device = 4
with patch('routes.wefax.app_module.release_sdr_device') as mock_release:
wefax_routes._progress_callback({
'type': 'wefax_progress',
'status': 'receiving',
'line_count': 120,
})
mock_release.assert_not_called()
assert wefax_routes.wefax_active_device == 4
finally:
wefax_routes.wefax_active_device = original_device

View File

@@ -0,0 +1,159 @@
"""Tests for WeFax auto-scheduler behavior and regressions."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
from utils.wefax_scheduler import ScheduledBroadcast, WeFaxScheduler
class TestWeFaxScheduler:
"""WeFaxScheduler regression tests."""
@patch('threading.Timer')
def test_refresh_reschedules_same_utc_slot_next_day(self, mock_timer):
"""Completed broadcasts must not block the next day's same UTC slot."""
scheduler = WeFaxScheduler()
scheduler._enabled = True
scheduler._station = 'USCG Kodiak'
scheduler._callsign = 'NOJ'
scheduler._frequency_khz = 4298.0
now = datetime.now(timezone.utc)
utc_time = (now - timedelta(hours=2)).strftime('%H:%M')
today = now.date().isoformat()
prior = ScheduledBroadcast(
station='USCG Kodiak',
callsign='NOJ',
frequency_khz=4298.0,
utc_time=utc_time,
duration_min=20,
content='Chart',
occurrence_date=today,
)
prior.status = 'complete'
scheduler._broadcasts = [prior]
mock_timer.return_value = MagicMock()
with patch('utils.wefax_scheduler.get_station', return_value={
'name': 'USCG Kodiak',
'schedule': [{
'utc': utc_time,
'duration_min': 20,
'content': 'Chart',
}],
}):
scheduler._refresh_schedule()
capture_calls = [
c for c in mock_timer.call_args_list
if len(c.args) >= 2 and getattr(c.args[1], '__name__', '') == '_execute_capture'
]
assert capture_calls, "Expected a capture timer for the next-day occurrence"
scheduled = [b for b in scheduler._broadcasts if b.status == 'scheduled']
assert len(scheduled) == 1
assert scheduled[0].occurrence_date != today
def test_execute_capture_stops_immediately_if_window_elapsed(self):
"""If stop delay computes to <= 0, capture should close out immediately."""
scheduler = WeFaxScheduler()
scheduler._enabled = True
scheduler._callsign = 'NOJ'
scheduler._frequency_khz = 4298.0
scheduler._device = 0
scheduler._gain = 40.0
scheduler._ioc = 576
scheduler._lpm = 120
scheduler._direct_sampling = True
now = datetime.now(timezone.utc)
sb = ScheduledBroadcast(
station='USCG Kodiak',
callsign='NOJ',
frequency_khz=4298.0,
utc_time=now.strftime('%H:%M'),
duration_min=0,
content='Late chart',
occurrence_date=now.date().isoformat(),
)
sb.status = 'scheduled'
mock_decoder = MagicMock()
mock_decoder.is_running = False
mock_decoder.start.return_value = True
with patch('utils.wefax_scheduler.get_wefax_decoder', return_value=mock_decoder), \
patch('utils.wefax_scheduler.WEFAX_CAPTURE_BUFFER_SECONDS', 0), \
patch('app.claim_sdr_device', return_value=None), \
patch.object(scheduler, '_stop_capture') as mock_stop_capture:
scheduler._execute_capture_inner(sb)
mock_stop_capture.assert_called_once()
@patch('threading.Timer')
def test_terminal_progress_releases_scheduler_device_early(self, mock_timer):
"""Scheduler captures must release SDR as soon as terminal progress arrives."""
scheduler = WeFaxScheduler()
scheduler._enabled = True
scheduler._callsign = 'NOJ'
scheduler._frequency_khz = 4298.0
scheduler._device = 0
scheduler._gain = 40.0
scheduler._ioc = 576
scheduler._lpm = 120
scheduler._direct_sampling = True
sb = ScheduledBroadcast(
station='USCG Kodiak',
callsign='NOJ',
frequency_khz=4298.0,
utc_time='12:00',
duration_min=20,
content='Chart',
occurrence_date='2026-01-01',
)
sb.status = 'scheduled'
mock_decoder = MagicMock()
mock_decoder.is_running = False
mock_decoder.start.return_value = True
mock_timer.return_value = MagicMock()
with patch('utils.wefax_scheduler.get_wefax_decoder', return_value=mock_decoder), \
patch('app.claim_sdr_device', return_value=None), \
patch('app.release_sdr_device') as mock_release:
scheduler._execute_capture_inner(sb)
progress_cb = mock_decoder.set_callback.call_args[0][0]
progress_cb({
'type': 'wefax_progress',
'status': 'error',
'message': 'rtl_fm failed',
})
mock_release.assert_called_once_with(0)
assert sb.status == 'skipped'
def test_stop_capture_non_capturing_only_releases(self):
"""_stop_capture should be idempotent when capture already ended."""
scheduler = WeFaxScheduler()
sb = ScheduledBroadcast(
station='USCG Kodiak',
callsign='NOJ',
frequency_khz=4298.0,
utc_time='12:00',
duration_min=20,
content='Chart',
occurrence_date='2026-01-01',
)
sb.status = 'complete'
release_fn = MagicMock()
with patch('utils.wefax_scheduler.get_wefax_decoder') as mock_get_decoder:
scheduler._stop_capture(sb, release_fn)
release_fn.assert_called_once()
mock_get_decoder.assert_not_called()

View File

@@ -173,7 +173,7 @@ class WeatherSatDecoder:
self._current_frequency: float = 0.0
self._current_mode: str = ''
self._capture_start_time: float = 0
self._device_index: int = 0
self._device_index: int = -1
self._capture_output_dir: Path | None = None
self._on_complete_callback: Callable[[], None] | None = None
self._capture_phase: str = 'idle'
@@ -303,12 +303,13 @@ class WeatherSatDecoder:
))
return False
self._current_satellite = satellite
self._current_frequency = sat_info['frequency']
self._current_mode = sat_info['mode']
self._capture_start_time = time.time()
self._capture_phase = 'decoding'
self._stop_event.clear()
self._current_satellite = satellite
self._current_frequency = sat_info['frequency']
self._current_mode = sat_info['mode']
self._device_index = -1 # Offline decode does not claim an SDR device
self._capture_start_time = time.time()
self._capture_phase = 'decoding'
self._stop_event.clear()
try:
self._running = True
@@ -464,15 +465,18 @@ class WeatherSatDecoder:
master_fd, slave_fd = pty.openpty()
self._pty_master_fd = master_fd
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
os.close(slave_fd) # parent doesn't need the slave side
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
try:
os.close(slave_fd) # parent doesn't need the slave side
except OSError:
pass
# Check for early exit asynchronously (avoid blocking /start for 3s)
def _check_early_exit():
@@ -564,15 +568,18 @@ class WeatherSatDecoder:
master_fd, slave_fd = pty.openpty()
self._pty_master_fd = master_fd
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
os.close(slave_fd) # parent doesn't need the slave side
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
try:
os.close(slave_fd) # parent doesn't need the slave side
except OSError:
pass
# For offline mode, don't check for early exit — file decoding
# may complete very quickly and exit code 0 is normal success.
@@ -892,16 +899,24 @@ class WeatherSatDecoder:
except OSError:
continue
# Determine product type from filename/path
product = self._parse_product_name(filepath)
# Copy image to main output dir for serving
serve_name = f"{self._current_satellite}_{filepath.stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
serve_path = self._output_dir / serve_name
try:
shutil.copy2(filepath, serve_path)
except OSError:
# Copy failed — don't mark as known so it can be retried
# Determine product type from filename/path
product = self._parse_product_name(filepath)
# Copy image to main output dir for serving
safe_sat = re.sub(r'[^A-Za-z0-9_-]+', '_', self._current_satellite).strip('_') or 'satellite'
safe_stem = re.sub(r'[^A-Za-z0-9_-]+', '_', filepath.stem).strip('_') or 'image'
suffix = filepath.suffix.lower()
if suffix not in ('.png', '.jpg', '.jpeg'):
suffix = '.png'
serve_name = (
f"{safe_sat}_{safe_stem}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
f"{suffix}"
)
serve_path = self._output_dir / serve_name
try:
shutil.copy2(filepath, serve_path)
except OSError:
# Copy failed — don't mark as known so it can be retried
continue
# Only mark as known after successful copy
@@ -945,12 +960,12 @@ class WeatherSatDecoder:
return 'Multispectral Analysis'
if 'thermal' in name or 'temp' in name:
return 'Thermal'
if 'ndvi' in name:
return 'NDVI Vegetation'
if 'channel' in name or 'ch' in name:
match = re.search(r'(?:channel|ch)\s*(\d+)', name)
if match:
return f'Channel {match.group(1)}'
if 'ndvi' in name:
return 'NDVI Vegetation'
if 'channel' in name or 'ch' in name:
match = re.search(r'(?:channel|ch)[\s_-]*(\d+)', name)
if match:
return f'Channel {match.group(1)}'
if 'avhrr' in name:
return 'AVHRR'
if 'msu' in name or 'mtvza' in name:
@@ -972,12 +987,13 @@ class WeatherSatDecoder:
self._stop_event.set()
self._close_pty()
if self._process:
safe_terminate(self._process)
self._process = None
elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0
logger.info(f"Weather satellite capture stopped after {elapsed}s")
if self._process:
safe_terminate(self._process)
self._process = None
elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0
logger.info(f"Weather satellite capture stopped after {elapsed}s")
self._device_index = -1
def get_images(self) -> list[WeatherSatImage]:
"""Get list of decoded images."""
@@ -1013,17 +1029,18 @@ class WeatherSatDecoder:
sat_info = WEATHER_SATELLITES.get(satellite, {})
image = WeatherSatImage(
filename=filepath.name,
path=filepath,
satellite=satellite,
mode=sat_info.get('mode', 'Unknown'),
image = WeatherSatImage(
filename=filepath.name,
path=filepath,
satellite=satellite,
mode=sat_info.get('mode', 'Unknown'),
timestamp=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
frequency=sat_info.get('frequency', 0.0),
size_bytes=stat.st_size,
product=self._parse_product_name(filepath),
)
self._images.append(image)
size_bytes=stat.st_size,
product=self._parse_product_name(filepath),
)
self._images.append(image)
known_filenames.add(filepath.name)
def delete_image(self, filename: str) -> bool:
"""Delete a decoded image."""

View File

@@ -319,12 +319,26 @@ class WeatherSatScheduler:
if self._progress_callback:
decoder.set_callback(self._progress_callback)
def _release_device():
try:
import app as app_module
app_module.release_sdr_device(self._device)
except ImportError:
pass
def _release_device():
try:
import app as app_module
owner = None
get_status = getattr(app_module, 'get_sdr_device_status', None)
if callable(get_status):
try:
owner = get_status().get(self._device)
except Exception:
owner = None
if owner and owner != 'weather_sat':
logger.debug(
"Skipping SDR release for device %s owned by %s",
self._device,
owner,
)
return
app_module.release_sdr_device(self._device)
except ImportError:
pass
decoder.set_on_complete(lambda: self._on_capture_complete(sp, _release_device))

View File

@@ -31,28 +31,30 @@ except ImportError:
WEFAX_CAPTURE_BUFFER_SECONDS = 30
class ScheduledBroadcast:
class ScheduledBroadcast:
"""A broadcast scheduled for automatic capture."""
def __init__(
self,
station: str,
callsign: str,
frequency_khz: float,
utc_time: str,
duration_min: int,
content: str,
):
self.id: str = str(uuid.uuid4())[:8]
self.station = station
self.callsign = callsign
self.frequency_khz = frequency_khz
self.utc_time = utc_time
self.duration_min = duration_min
self.content = content
self.status: str = 'scheduled' # scheduled, capturing, complete, skipped
self._timer: threading.Timer | None = None
self._stop_timer: threading.Timer | None = None
def __init__(
self,
station: str,
callsign: str,
frequency_khz: float,
utc_time: str,
duration_min: int,
content: str,
occurrence_date: str = '',
):
self.id: str = str(uuid.uuid4())[:8]
self.station = station
self.callsign = callsign
self.frequency_khz = frequency_khz
self.utc_time = utc_time
self.duration_min = duration_min
self.content = content
self.occurrence_date = occurrence_date
self.status: str = 'scheduled' # scheduled, capturing, complete, skipped
self._timer: threading.Timer | None = None
self._stop_timer: threading.Timer | None = None
def to_dict(self) -> dict[str, Any]:
return {
@@ -60,14 +62,15 @@ class ScheduledBroadcast:
'station': self.station,
'callsign': self.callsign,
'frequency_khz': self.frequency_khz,
'utc_time': self.utc_time,
'duration_min': self.duration_min,
'content': self.content,
'status': self.status,
}
'utc_time': self.utc_time,
'duration_min': self.duration_min,
'content': self.content,
'occurrence_date': self.occurrence_date,
'status': self.status,
}
class WeFaxScheduler:
class WeFaxScheduler:
"""Auto-scheduler for WeFax broadcast captures."""
def __init__(self):
@@ -204,10 +207,15 @@ class WeFaxScheduler:
'total_broadcasts': len(self._broadcasts),
}
def get_broadcasts(self) -> list[dict[str, Any]]:
"""Get list of scheduled broadcasts."""
with self._lock:
return [b.to_dict() for b in self._broadcasts]
def get_broadcasts(self) -> list[dict[str, Any]]:
"""Get list of scheduled broadcasts."""
with self._lock:
return [b.to_dict() for b in self._broadcasts]
@staticmethod
def _history_key(callsign: str, utc_time: str, occurrence_date: str) -> str:
"""Build a stable key for one station UTC slot on one calendar day."""
return f'{callsign}_{utc_time}_{occurrence_date}'
def _refresh_schedule(self) -> None:
"""Recompute broadcast schedule and set timers."""
@@ -269,24 +277,34 @@ class WeFaxScheduler:
minutes=duration_min, seconds=buffer
)
capture_start = broadcast_dt - timedelta(seconds=buffer)
# Check if already in history
history_key = f"{self._callsign}_{utc_time}"
if any(
f"{h.callsign}_{h.utc_time}" == history_key
for h in history
):
continue
sb = ScheduledBroadcast(
capture_start = broadcast_dt - timedelta(seconds=buffer)
occurrence_date = broadcast_dt.date().isoformat()
# Check if this specific day/slot was already processed.
history_key = self._history_key(
self._callsign,
utc_time,
occurrence_date,
)
if any(
self._history_key(
h.callsign,
h.utc_time,
getattr(h, 'occurrence_date', ''),
) == history_key
for h in history
):
continue
sb = ScheduledBroadcast(
station=self._station,
callsign=self._callsign,
frequency_khz=self._frequency_khz,
utc_time=utc_time,
duration_min=duration_min,
content=content,
)
frequency_khz=self._frequency_khz,
utc_time=utc_time,
duration_min=duration_min,
content=content,
occurrence_date=occurrence_date,
)
# Schedule capture timer
delay = max(0.0, (capture_start - now).total_seconds())
@@ -371,18 +389,57 @@ class WeFaxScheduler:
except ImportError:
pass
sb.status = 'capturing'
# Set up callbacks
if self._progress_callback:
decoder.set_callback(self._progress_callback)
def _release_device():
try:
import app as app_module
app_module.release_sdr_device(self._device)
except ImportError:
pass
sb.status = 'capturing'
def _release_device():
try:
import app as app_module
app_module.release_sdr_device(self._device)
except ImportError:
pass
released = False
release_lock = threading.Lock()
def _release_device_once() -> None:
nonlocal released
with release_lock:
if released:
return
released = True
_release_device()
def _scheduler_progress_callback(progress: dict) -> None:
"""Forward progress updates and release scheduler resources on terminal states."""
if self._progress_callback:
self._progress_callback(progress)
if not isinstance(progress, dict) or progress.get('type') != 'wefax_progress':
return
status = progress.get('status')
if status not in ('complete', 'error', 'stopped'):
return
if sb.status == 'capturing':
if status == 'complete':
sb.status = 'complete'
self._emit_event({
'type': 'schedule_capture_complete',
'broadcast': sb.to_dict(),
})
else:
sb.status = 'skipped'
self._emit_event({
'type': 'schedule_capture_skipped',
'broadcast': sb.to_dict(),
'reason': 'decoder_error',
'detail': progress.get('message', ''),
})
_release_device_once()
decoder.set_callback(_scheduler_progress_callback)
success = decoder.start(
frequency_khz=self._frequency_khz,
@@ -414,39 +471,53 @@ class WeFaxScheduler:
minutes=sb.duration_min,
seconds=WEFAX_CAPTURE_BUFFER_SECONDS,
)
stop_delay = max(0.0, (stop_dt - now).total_seconds())
if stop_delay > 0:
sb._stop_timer = threading.Timer(
stop_delay, self._stop_capture, args=[sb, _release_device]
)
sb._stop_timer.daemon = True
sb._stop_timer.start()
else:
sb.status = 'skipped'
_release_device()
self._emit_event({
'type': 'schedule_capture_skipped',
'broadcast': sb.to_dict(),
'reason': 'start_failed',
'detail': decoder.last_error or 'unknown error',
stop_delay = max(0.0, (stop_dt - now).total_seconds())
if stop_delay > 0:
sb._stop_timer = threading.Timer(
stop_delay, self._stop_capture, args=[sb, _release_device_once]
)
sb._stop_timer.daemon = True
sb._stop_timer.start()
else:
# If execution was delayed beyond end-of-window, close out
# immediately so SDR allocation is never stranded.
logger.warning(
"Capture window already elapsed for %s at %s UTC; stopping immediately",
sb.content,
sb.utc_time,
)
self._stop_capture(sb, _release_device_once)
else:
sb.status = 'skipped'
_release_device_once()
self._emit_event({
'type': 'schedule_capture_skipped',
'broadcast': sb.to_dict(),
'reason': 'start_failed',
'detail': decoder.last_error or 'unknown error',
})
def _stop_capture(
self, sb: ScheduledBroadcast, release_fn: Callable
) -> None:
"""Stop capture at broadcast end."""
decoder = get_wefax_decoder()
if decoder.is_running:
decoder.stop()
logger.info("Auto-scheduler stopped capture: %s", sb.content)
sb.status = 'complete'
release_fn()
self._emit_event({
'type': 'schedule_capture_complete',
'broadcast': sb.to_dict(),
})
def _stop_capture(
self, sb: ScheduledBroadcast, release_fn: Callable
) -> None:
"""Stop capture at broadcast end."""
if sb.status != 'capturing':
release_fn()
return
sb.status = 'complete'
decoder = get_wefax_decoder()
if decoder.is_running:
decoder.stop()
logger.info("Auto-scheduler stopped capture: %s", sb.content)
release_fn()
self._emit_event({
'type': 'schedule_capture_complete',
'broadcast': sb.to_dict(),
})
def _emit_event(self, event: dict[str, Any]) -> None:
"""Emit scheduler event to callback."""