diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md new file mode 100644 index 0000000..6e58a0f --- /dev/null +++ b/.claude/CLAUDE.md @@ -0,0 +1,42 @@ +## Workflow Orchestration +### 1. Plan Node Default +- Enter plan mode for ANY non-trivial task (3+ steps or architectural decisions) +- If something goes sideways, STOP and re-plan immediately - don't keep pushing +- Use plan mode for verification steps, not just building +- Write detailed specs upfront to reduce ambiguity +### 2. Subagent Strategy +- Use subagents liberally to keep main context window clean +- Offload research, exploration, and parallel analysis to subagents +- For complex problems, throw more compute at it via subagents +- One tack per subagent for focused execution +### 3. Self-Improvement Loop +- After ANY correction from the user: update 'tasks/lessons.md" with the pattern +- Write rules for yourself that prevent the same mistake +- Ruthlessly iterate on these lessons until mistake rate drops +- Review lessons at session start for relevant project +### 4. Verification Before Done +- Never mark a task complete without proving it works +- Diff behavior between main and your changes when relevant +- Ask yourself: "Would a staff engineer approve this?" +- Run tests, check logs, demonstrate correctness +### 5. Demand Elegance (Balanced) +- For non-trivial changes: pause and ask "is there a more elegant way?" +- If a fix feels hacky: "Knowing everything I know now, implement the elegant solution" +- Skip this for simple, obvious fixes - don't over-engineer +-Challenge your own work before presenting it +### 6. Autonomous Bug Fizing +- When given a bug report: just fix it. Don't ask for hand-holding +- Point at logs, errors, failing tests - then resolve them +- Zero context switching required from the user +- Go fix failing CI tests without being told how +## Task Management +1. **Plan First**: Write plan to "tasks/todo.md" with checkable items +2. **Verify Plan**: Check in before starting implementation +3. **Track Progress**: Mark items complete as you go +4. **Explain Changes**: High-level summary at each step +5. **Document Results**: Add review section to 'tasks/todo.md" +6. **Capture Lessons**: Update 'tasks/lessons.md' after corrections +## Core Principles +- **Simplicity First**: Make every change as simple as possible. Impact minimal code. +- **No Laziness**: Find root causes. No temporary fixes. Senior developer standards. +- **Minimat Impact**: Changes should only touch what's necessary. Avoid introducing bugs. diff --git a/routes/weather_sat.py b/routes/weather_sat.py index 9522539..023d9cf 100644 --- a/routes/weather_sat.py +++ b/routes/weather_sat.py @@ -26,19 +26,48 @@ logger = get_logger('intercept.weather_sat') weather_sat_bp = Blueprint('weather_sat', __name__, url_prefix='/weather-sat') # Queue for SSE progress streaming -_weather_sat_queue: queue.Queue = queue.Queue(maxsize=100) +_weather_sat_queue: queue.Queue = queue.Queue(maxsize=100) -def _progress_callback(progress: CaptureProgress) -> None: - """Callback to queue progress updates for SSE stream.""" - try: - _weather_sat_queue.put_nowait(progress.to_dict()) +def _progress_callback(progress: CaptureProgress) -> None: + """Callback to queue progress updates for SSE stream.""" + try: + _weather_sat_queue.put_nowait(progress.to_dict()) except queue.Full: try: _weather_sat_queue.get_nowait() _weather_sat_queue.put_nowait(progress.to_dict()) - except queue.Empty: - pass + except queue.Empty: + pass + + +def _release_weather_sat_device(device_index: int) -> None: + """Release an SDR device only if weather-sat currently owns it.""" + if device_index < 0: + return + + try: + import app as app_module + except ImportError: + return + + owner = None + get_status = getattr(app_module, 'get_sdr_device_status', None) + if callable(get_status): + try: + owner = get_status().get(device_index) + except Exception: + owner = None + + if owner and owner != 'weather_sat': + logger.debug( + 'Skipping SDR release for device %s owned by %s', + device_index, + owner, + ) + return + + app_module.release_sdr_device(device_index) @weather_sat_bp.route('/status') @@ -149,15 +178,11 @@ def start_capture(): except queue.Empty: break - # Set callback and on-complete handler for SDR release - decoder.set_callback(_progress_callback) - - def _release_device(): - try: - import app as app_module - app_module.release_sdr_device(device_index) - except ImportError: - pass + # Set callback and on-complete handler for SDR release + decoder.set_callback(_progress_callback) + + def _release_device(): + _release_weather_sat_device(device_index) decoder.set_on_complete(_release_device) @@ -318,14 +343,9 @@ def stop_capture(): decoder = get_weather_sat_decoder() device_index = decoder.device_index - decoder.stop() - - # Release SDR device - try: - import app as app_module - app_module.release_sdr_device(device_index) - except ImportError: - pass + decoder.stop() + + _release_weather_sat_device(device_index) return jsonify({'status': 'stopped'}) diff --git a/routes/wefax.py b/routes/wefax.py index bc81132..d5c58e1 100644 --- a/routes/wefax.py +++ b/routes/wefax.py @@ -28,16 +28,29 @@ _wefax_queue: queue.Queue = queue.Queue(maxsize=100) wefax_active_device: int | None = None -def _progress_callback(data: dict) -> None: - """Callback to queue progress updates for SSE stream.""" - try: - _wefax_queue.put_nowait(data) - except queue.Full: - try: - _wefax_queue.get_nowait() - _wefax_queue.put_nowait(data) - except queue.Empty: - pass +def _progress_callback(data: dict) -> None: + """Callback to queue progress updates for SSE stream.""" + global wefax_active_device + + try: + _wefax_queue.put_nowait(data) + except queue.Full: + try: + _wefax_queue.get_nowait() + _wefax_queue.put_nowait(data) + except queue.Empty: + pass + + # Ensure manually claimed SDR devices are always released when a + # decode session ends on its own (complete/error/stopped). + if ( + isinstance(data, dict) + and data.get('type') == 'wefax_progress' + and data.get('status') in ('complete', 'error', 'stopped') + and wefax_active_device is not None + ): + app_module.release_sdr_device(wefax_active_device) + wefax_active_device = None @wefax_bp.route('/status') diff --git a/static/js/modes/wefax.js b/static/js/modes/wefax.js index acd913f..a5a1902 100644 --- a/static/js/modes/wefax.js +++ b/static/js/modes/wefax.js @@ -327,17 +327,25 @@ var WeFax = (function () { if (idleEl) idleEl.style.display = 'none'; } - // Image complete - if (data.status === 'complete' && data.image) { - scopeImageBurst = 1.0; - loadImages(); - setStatus('Image decoded: ' + (data.line_count || '?') + ' lines'); - } - - if (data.status === 'error') { - state.running = false; - updateButtons(false); - showStripError(data.message || 'Decode error'); + // Image complete + if (data.status === 'complete' && data.image) { + scopeImageBurst = 1.0; + loadImages(); + setStatus('Image decoded: ' + (data.line_count || '?') + ' lines'); + } + + if (data.status === 'complete') { + state.running = false; + updateButtons(false); + if (!state.schedulerEnabled) { + disconnectSSE(); + } + } + + if (data.status === 'error') { + state.running = false; + updateButtons(false); + showStripError(data.message || 'Decode error'); } if (data.status === 'stopped') { diff --git a/tests/test_weather_sat_regressions.py b/tests/test_weather_sat_regressions.py new file mode 100644 index 0000000..737751a --- /dev/null +++ b/tests/test_weather_sat_regressions.py @@ -0,0 +1,120 @@ +"""Targeted regression tests for recent weather-satellite hardening fixes.""" + +from __future__ import annotations + +import re +from unittest.mock import MagicMock, patch + +import pytest + +from utils.weather_sat import WeatherSatDecoder + + +@pytest.fixture +def authed_client(client): + """Return a logged-in test client for authenticated weather-sat routes.""" + with client.session_transaction() as session: + session['logged_in'] = True + return client + + +class TestWeatherSatRouteReleaseGuards: + """Regression tests for safe SDR release behavior in weather-sat routes.""" + + def test_stop_does_not_release_device_owned_by_other_mode(self, authed_client): + """POST /weather-sat/stop should not release a foreign-owned SDR device.""" + mock_decoder = MagicMock() + mock_decoder.device_index = 2 + + with patch('routes.weather_sat.get_weather_sat_decoder', return_value=mock_decoder), \ + patch('app.get_sdr_device_status', return_value={2: 'wifi'}), \ + patch('app.release_sdr_device') as mock_release: + response = authed_client.post('/weather-sat/stop') + + assert response.status_code == 200 + assert response.get_json()['status'] == 'stopped' + mock_decoder.stop.assert_called_once() + mock_release.assert_not_called() + + def test_stop_releases_device_owned_by_weather_sat(self, authed_client): + """POST /weather-sat/stop should release SDR when weather-sat owns it.""" + mock_decoder = MagicMock() + mock_decoder.device_index = 2 + + with patch('routes.weather_sat.get_weather_sat_decoder', return_value=mock_decoder), \ + patch('app.get_sdr_device_status', return_value={2: 'weather_sat'}), \ + patch('app.release_sdr_device') as mock_release: + response = authed_client.post('/weather-sat/stop') + + assert response.status_code == 200 + assert response.get_json()['status'] == 'stopped' + mock_decoder.stop.assert_called_once() + mock_release.assert_called_once_with(2) + + def test_stop_skips_release_for_offline_decode_index(self, authed_client): + """POST /weather-sat/stop should not release when decoder index is -1.""" + mock_decoder = MagicMock() + mock_decoder.device_index = -1 + + with patch('routes.weather_sat.get_weather_sat_decoder', return_value=mock_decoder), \ + patch('app.release_sdr_device') as mock_release: + response = authed_client.post('/weather-sat/stop') + + assert response.status_code == 200 + assert response.get_json()['status'] == 'stopped' + mock_decoder.stop.assert_called_once() + mock_release.assert_not_called() + + +class TestWeatherSatDecoderRegressions: + """Regression tests for decoder filename and offline-device handling.""" + + def test_scan_output_dir_preserves_extension_and_sanitizes_filename(self, tmp_path): + """Copied image names should stay safe and preserve JPG/JPEG extensions.""" + output_dir = tmp_path / 'weather_sat_out' + capture_dir = tmp_path / 'capture' + capture_dir.mkdir(parents=True) + + source_image = capture_dir / 'channel 3 (raw).jpeg' + source_image.write_bytes(b'\xff\xd8\xff' + b'\x00' * 2048) + + with patch('shutil.which', return_value='/usr/bin/satdump'): + decoder = WeatherSatDecoder(output_dir=output_dir) + + decoder._capture_output_dir = capture_dir + decoder._current_satellite = 'METEOR-M2-4' + decoder._current_mode = 'LRPT' + decoder._current_frequency = 137.9 + + decoder._scan_output_dir(set()) + + assert len(decoder._images) == 1 + image = decoder._images[0] + assert image.filename.endswith('.jpeg') + assert re.fullmatch(r'[A-Za-z0-9_.-]+', image.filename) + assert (output_dir / image.filename).is_file() + + def test_start_from_file_keeps_device_index_unclaimed(self, tmp_path): + """Offline file decode should not claim or persist an SDR device index.""" + with patch('shutil.which', return_value='/usr/bin/satdump'), \ + patch('pathlib.Path.is_file', return_value=True), \ + patch('pathlib.Path.resolve') as mock_resolve, \ + patch.object(WeatherSatDecoder, '_start_satdump_offline') as mock_start: + + resolved = MagicMock() + resolved.is_relative_to.return_value = True + mock_resolve.return_value = resolved + + decoder = WeatherSatDecoder(output_dir=tmp_path / 'weather_sat_out') + success = decoder.start_from_file( + satellite='METEOR-M2-3', + input_file='data/weather_sat/samples/sample.wav', + sample_rate=1_000_000, + ) + + assert success is True + assert decoder.device_index == -1 + mock_start.assert_called_once() + + decoder.stop() + assert decoder.device_index == -1 diff --git a/tests/test_wefax.py b/tests/test_wefax.py index 4ac3b72..2f23d1a 100644 --- a/tests/test_wefax.py +++ b/tests/test_wefax.py @@ -225,7 +225,7 @@ class TestWeFaxDecoder: # Route tests # --------------------------------------------------------------------------- -class TestWeFaxRoutes: +class TestWeFaxRoutes: """WeFax route endpoint tests.""" def test_status(self, client): @@ -419,12 +419,54 @@ class TestWeFaxRoutes: assert response.status_code == 400 - def test_delete_image_wrong_extension(self, client): - """DELETE /wefax/images/ should reject non-PNG.""" - _login_session(client) - mock_decoder = MagicMock() - - with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder): - response = client.delete('/wefax/images/test.jpg') - - assert response.status_code == 400 + def test_delete_image_wrong_extension(self, client): + """DELETE /wefax/images/ should reject non-PNG.""" + _login_session(client) + mock_decoder = MagicMock() + + with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder): + response = client.delete('/wefax/images/test.jpg') + + assert response.status_code == 400 + + +class TestWeFaxProgressCallback: + """Regression tests for WeFax route-level progress callback behavior.""" + + def test_terminal_progress_releases_active_device(self): + """Terminal decoder events must release any manually claimed SDR.""" + import routes.wefax as wefax_routes + + original_device = wefax_routes.wefax_active_device + try: + wefax_routes.wefax_active_device = 3 + with patch('routes.wefax.app_module.release_sdr_device') as mock_release: + wefax_routes._progress_callback({ + 'type': 'wefax_progress', + 'status': 'error', + 'message': 'decode failed', + }) + + mock_release.assert_called_once_with(3) + assert wefax_routes.wefax_active_device is None + finally: + wefax_routes.wefax_active_device = original_device + + def test_non_terminal_progress_does_not_release_active_device(self): + """Non-terminal progress updates must not release SDR ownership.""" + import routes.wefax as wefax_routes + + original_device = wefax_routes.wefax_active_device + try: + wefax_routes.wefax_active_device = 4 + with patch('routes.wefax.app_module.release_sdr_device') as mock_release: + wefax_routes._progress_callback({ + 'type': 'wefax_progress', + 'status': 'receiving', + 'line_count': 120, + }) + + mock_release.assert_not_called() + assert wefax_routes.wefax_active_device == 4 + finally: + wefax_routes.wefax_active_device = original_device diff --git a/tests/test_wefax_scheduler.py b/tests/test_wefax_scheduler.py new file mode 100644 index 0000000..93dbee2 --- /dev/null +++ b/tests/test_wefax_scheduler.py @@ -0,0 +1,159 @@ +"""Tests for WeFax auto-scheduler behavior and regressions.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + +from utils.wefax_scheduler import ScheduledBroadcast, WeFaxScheduler + + +class TestWeFaxScheduler: + """WeFaxScheduler regression tests.""" + + @patch('threading.Timer') + def test_refresh_reschedules_same_utc_slot_next_day(self, mock_timer): + """Completed broadcasts must not block the next day's same UTC slot.""" + scheduler = WeFaxScheduler() + scheduler._enabled = True + scheduler._station = 'USCG Kodiak' + scheduler._callsign = 'NOJ' + scheduler._frequency_khz = 4298.0 + + now = datetime.now(timezone.utc) + utc_time = (now - timedelta(hours=2)).strftime('%H:%M') + today = now.date().isoformat() + + prior = ScheduledBroadcast( + station='USCG Kodiak', + callsign='NOJ', + frequency_khz=4298.0, + utc_time=utc_time, + duration_min=20, + content='Chart', + occurrence_date=today, + ) + prior.status = 'complete' + scheduler._broadcasts = [prior] + + mock_timer.return_value = MagicMock() + + with patch('utils.wefax_scheduler.get_station', return_value={ + 'name': 'USCG Kodiak', + 'schedule': [{ + 'utc': utc_time, + 'duration_min': 20, + 'content': 'Chart', + }], + }): + scheduler._refresh_schedule() + + capture_calls = [ + c for c in mock_timer.call_args_list + if len(c.args) >= 2 and getattr(c.args[1], '__name__', '') == '_execute_capture' + ] + assert capture_calls, "Expected a capture timer for the next-day occurrence" + + scheduled = [b for b in scheduler._broadcasts if b.status == 'scheduled'] + assert len(scheduled) == 1 + assert scheduled[0].occurrence_date != today + + def test_execute_capture_stops_immediately_if_window_elapsed(self): + """If stop delay computes to <= 0, capture should close out immediately.""" + scheduler = WeFaxScheduler() + scheduler._enabled = True + scheduler._callsign = 'NOJ' + scheduler._frequency_khz = 4298.0 + scheduler._device = 0 + scheduler._gain = 40.0 + scheduler._ioc = 576 + scheduler._lpm = 120 + scheduler._direct_sampling = True + + now = datetime.now(timezone.utc) + sb = ScheduledBroadcast( + station='USCG Kodiak', + callsign='NOJ', + frequency_khz=4298.0, + utc_time=now.strftime('%H:%M'), + duration_min=0, + content='Late chart', + occurrence_date=now.date().isoformat(), + ) + sb.status = 'scheduled' + + mock_decoder = MagicMock() + mock_decoder.is_running = False + mock_decoder.start.return_value = True + + with patch('utils.wefax_scheduler.get_wefax_decoder', return_value=mock_decoder), \ + patch('utils.wefax_scheduler.WEFAX_CAPTURE_BUFFER_SECONDS', 0), \ + patch('app.claim_sdr_device', return_value=None), \ + patch.object(scheduler, '_stop_capture') as mock_stop_capture: + scheduler._execute_capture_inner(sb) + + mock_stop_capture.assert_called_once() + + @patch('threading.Timer') + def test_terminal_progress_releases_scheduler_device_early(self, mock_timer): + """Scheduler captures must release SDR as soon as terminal progress arrives.""" + scheduler = WeFaxScheduler() + scheduler._enabled = True + scheduler._callsign = 'NOJ' + scheduler._frequency_khz = 4298.0 + scheduler._device = 0 + scheduler._gain = 40.0 + scheduler._ioc = 576 + scheduler._lpm = 120 + scheduler._direct_sampling = True + + sb = ScheduledBroadcast( + station='USCG Kodiak', + callsign='NOJ', + frequency_khz=4298.0, + utc_time='12:00', + duration_min=20, + content='Chart', + occurrence_date='2026-01-01', + ) + sb.status = 'scheduled' + + mock_decoder = MagicMock() + mock_decoder.is_running = False + mock_decoder.start.return_value = True + mock_timer.return_value = MagicMock() + + with patch('utils.wefax_scheduler.get_wefax_decoder', return_value=mock_decoder), \ + patch('app.claim_sdr_device', return_value=None), \ + patch('app.release_sdr_device') as mock_release: + scheduler._execute_capture_inner(sb) + progress_cb = mock_decoder.set_callback.call_args[0][0] + progress_cb({ + 'type': 'wefax_progress', + 'status': 'error', + 'message': 'rtl_fm failed', + }) + + mock_release.assert_called_once_with(0) + assert sb.status == 'skipped' + + def test_stop_capture_non_capturing_only_releases(self): + """_stop_capture should be idempotent when capture already ended.""" + scheduler = WeFaxScheduler() + sb = ScheduledBroadcast( + station='USCG Kodiak', + callsign='NOJ', + frequency_khz=4298.0, + utc_time='12:00', + duration_min=20, + content='Chart', + occurrence_date='2026-01-01', + ) + sb.status = 'complete' + release_fn = MagicMock() + + with patch('utils.wefax_scheduler.get_wefax_decoder') as mock_get_decoder: + scheduler._stop_capture(sb, release_fn) + + release_fn.assert_called_once() + mock_get_decoder.assert_not_called() diff --git a/utils/weather_sat.py b/utils/weather_sat.py index 9ebbdee..f30b36d 100644 --- a/utils/weather_sat.py +++ b/utils/weather_sat.py @@ -173,7 +173,7 @@ class WeatherSatDecoder: self._current_frequency: float = 0.0 self._current_mode: str = '' self._capture_start_time: float = 0 - self._device_index: int = 0 + self._device_index: int = -1 self._capture_output_dir: Path | None = None self._on_complete_callback: Callable[[], None] | None = None self._capture_phase: str = 'idle' @@ -303,12 +303,13 @@ class WeatherSatDecoder: )) return False - self._current_satellite = satellite - self._current_frequency = sat_info['frequency'] - self._current_mode = sat_info['mode'] - self._capture_start_time = time.time() - self._capture_phase = 'decoding' - self._stop_event.clear() + self._current_satellite = satellite + self._current_frequency = sat_info['frequency'] + self._current_mode = sat_info['mode'] + self._device_index = -1 # Offline decode does not claim an SDR device + self._capture_start_time = time.time() + self._capture_phase = 'decoding' + self._stop_event.clear() try: self._running = True @@ -464,15 +465,18 @@ class WeatherSatDecoder: master_fd, slave_fd = pty.openpty() self._pty_master_fd = master_fd - self._process = subprocess.Popen( - cmd, - stdout=slave_fd, - stderr=slave_fd, - stdin=subprocess.DEVNULL, - close_fds=True, - ) - register_process(self._process) - os.close(slave_fd) # parent doesn't need the slave side + self._process = subprocess.Popen( + cmd, + stdout=slave_fd, + stderr=slave_fd, + stdin=subprocess.DEVNULL, + close_fds=True, + ) + register_process(self._process) + try: + os.close(slave_fd) # parent doesn't need the slave side + except OSError: + pass # Check for early exit asynchronously (avoid blocking /start for 3s) def _check_early_exit(): @@ -564,15 +568,18 @@ class WeatherSatDecoder: master_fd, slave_fd = pty.openpty() self._pty_master_fd = master_fd - self._process = subprocess.Popen( - cmd, - stdout=slave_fd, - stderr=slave_fd, - stdin=subprocess.DEVNULL, - close_fds=True, - ) - register_process(self._process) - os.close(slave_fd) # parent doesn't need the slave side + self._process = subprocess.Popen( + cmd, + stdout=slave_fd, + stderr=slave_fd, + stdin=subprocess.DEVNULL, + close_fds=True, + ) + register_process(self._process) + try: + os.close(slave_fd) # parent doesn't need the slave side + except OSError: + pass # For offline mode, don't check for early exit — file decoding # may complete very quickly and exit code 0 is normal success. @@ -892,16 +899,24 @@ class WeatherSatDecoder: except OSError: continue - # Determine product type from filename/path - product = self._parse_product_name(filepath) - - # Copy image to main output dir for serving - serve_name = f"{self._current_satellite}_{filepath.stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" - serve_path = self._output_dir / serve_name - try: - shutil.copy2(filepath, serve_path) - except OSError: - # Copy failed — don't mark as known so it can be retried + # Determine product type from filename/path + product = self._parse_product_name(filepath) + + # Copy image to main output dir for serving + safe_sat = re.sub(r'[^A-Za-z0-9_-]+', '_', self._current_satellite).strip('_') or 'satellite' + safe_stem = re.sub(r'[^A-Za-z0-9_-]+', '_', filepath.stem).strip('_') or 'image' + suffix = filepath.suffix.lower() + if suffix not in ('.png', '.jpg', '.jpeg'): + suffix = '.png' + serve_name = ( + f"{safe_sat}_{safe_stem}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" + f"{suffix}" + ) + serve_path = self._output_dir / serve_name + try: + shutil.copy2(filepath, serve_path) + except OSError: + # Copy failed — don't mark as known so it can be retried continue # Only mark as known after successful copy @@ -945,12 +960,12 @@ class WeatherSatDecoder: return 'Multispectral Analysis' if 'thermal' in name or 'temp' in name: return 'Thermal' - if 'ndvi' in name: - return 'NDVI Vegetation' - if 'channel' in name or 'ch' in name: - match = re.search(r'(?:channel|ch)\s*(\d+)', name) - if match: - return f'Channel {match.group(1)}' + if 'ndvi' in name: + return 'NDVI Vegetation' + if 'channel' in name or 'ch' in name: + match = re.search(r'(?:channel|ch)[\s_-]*(\d+)', name) + if match: + return f'Channel {match.group(1)}' if 'avhrr' in name: return 'AVHRR' if 'msu' in name or 'mtvza' in name: @@ -972,12 +987,13 @@ class WeatherSatDecoder: self._stop_event.set() self._close_pty() - if self._process: - safe_terminate(self._process) - self._process = None - - elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0 - logger.info(f"Weather satellite capture stopped after {elapsed}s") + if self._process: + safe_terminate(self._process) + self._process = None + + elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0 + logger.info(f"Weather satellite capture stopped after {elapsed}s") + self._device_index = -1 def get_images(self) -> list[WeatherSatImage]: """Get list of decoded images.""" @@ -1013,17 +1029,18 @@ class WeatherSatDecoder: sat_info = WEATHER_SATELLITES.get(satellite, {}) - image = WeatherSatImage( - filename=filepath.name, - path=filepath, - satellite=satellite, - mode=sat_info.get('mode', 'Unknown'), + image = WeatherSatImage( + filename=filepath.name, + path=filepath, + satellite=satellite, + mode=sat_info.get('mode', 'Unknown'), timestamp=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc), frequency=sat_info.get('frequency', 0.0), - size_bytes=stat.st_size, - product=self._parse_product_name(filepath), - ) - self._images.append(image) + size_bytes=stat.st_size, + product=self._parse_product_name(filepath), + ) + self._images.append(image) + known_filenames.add(filepath.name) def delete_image(self, filename: str) -> bool: """Delete a decoded image.""" diff --git a/utils/weather_sat_scheduler.py b/utils/weather_sat_scheduler.py index 898019c..c214090 100644 --- a/utils/weather_sat_scheduler.py +++ b/utils/weather_sat_scheduler.py @@ -319,12 +319,26 @@ class WeatherSatScheduler: if self._progress_callback: decoder.set_callback(self._progress_callback) - def _release_device(): - try: - import app as app_module - app_module.release_sdr_device(self._device) - except ImportError: - pass + def _release_device(): + try: + import app as app_module + owner = None + get_status = getattr(app_module, 'get_sdr_device_status', None) + if callable(get_status): + try: + owner = get_status().get(self._device) + except Exception: + owner = None + if owner and owner != 'weather_sat': + logger.debug( + "Skipping SDR release for device %s owned by %s", + self._device, + owner, + ) + return + app_module.release_sdr_device(self._device) + except ImportError: + pass decoder.set_on_complete(lambda: self._on_capture_complete(sp, _release_device)) diff --git a/utils/wefax_scheduler.py b/utils/wefax_scheduler.py index 3f8d9e5..91c5ffc 100644 --- a/utils/wefax_scheduler.py +++ b/utils/wefax_scheduler.py @@ -31,28 +31,30 @@ except ImportError: WEFAX_CAPTURE_BUFFER_SECONDS = 30 -class ScheduledBroadcast: +class ScheduledBroadcast: """A broadcast scheduled for automatic capture.""" - def __init__( - self, - station: str, - callsign: str, - frequency_khz: float, - utc_time: str, - duration_min: int, - content: str, - ): - self.id: str = str(uuid.uuid4())[:8] - self.station = station - self.callsign = callsign - self.frequency_khz = frequency_khz - self.utc_time = utc_time - self.duration_min = duration_min - self.content = content - self.status: str = 'scheduled' # scheduled, capturing, complete, skipped - self._timer: threading.Timer | None = None - self._stop_timer: threading.Timer | None = None + def __init__( + self, + station: str, + callsign: str, + frequency_khz: float, + utc_time: str, + duration_min: int, + content: str, + occurrence_date: str = '', + ): + self.id: str = str(uuid.uuid4())[:8] + self.station = station + self.callsign = callsign + self.frequency_khz = frequency_khz + self.utc_time = utc_time + self.duration_min = duration_min + self.content = content + self.occurrence_date = occurrence_date + self.status: str = 'scheduled' # scheduled, capturing, complete, skipped + self._timer: threading.Timer | None = None + self._stop_timer: threading.Timer | None = None def to_dict(self) -> dict[str, Any]: return { @@ -60,14 +62,15 @@ class ScheduledBroadcast: 'station': self.station, 'callsign': self.callsign, 'frequency_khz': self.frequency_khz, - 'utc_time': self.utc_time, - 'duration_min': self.duration_min, - 'content': self.content, - 'status': self.status, - } + 'utc_time': self.utc_time, + 'duration_min': self.duration_min, + 'content': self.content, + 'occurrence_date': self.occurrence_date, + 'status': self.status, + } -class WeFaxScheduler: +class WeFaxScheduler: """Auto-scheduler for WeFax broadcast captures.""" def __init__(self): @@ -204,10 +207,15 @@ class WeFaxScheduler: 'total_broadcasts': len(self._broadcasts), } - def get_broadcasts(self) -> list[dict[str, Any]]: - """Get list of scheduled broadcasts.""" - with self._lock: - return [b.to_dict() for b in self._broadcasts] + def get_broadcasts(self) -> list[dict[str, Any]]: + """Get list of scheduled broadcasts.""" + with self._lock: + return [b.to_dict() for b in self._broadcasts] + + @staticmethod + def _history_key(callsign: str, utc_time: str, occurrence_date: str) -> str: + """Build a stable key for one station UTC slot on one calendar day.""" + return f'{callsign}_{utc_time}_{occurrence_date}' def _refresh_schedule(self) -> None: """Recompute broadcast schedule and set timers.""" @@ -269,24 +277,34 @@ class WeFaxScheduler: minutes=duration_min, seconds=buffer ) - capture_start = broadcast_dt - timedelta(seconds=buffer) - - # Check if already in history - history_key = f"{self._callsign}_{utc_time}" - if any( - f"{h.callsign}_{h.utc_time}" == history_key - for h in history - ): - continue - - sb = ScheduledBroadcast( + capture_start = broadcast_dt - timedelta(seconds=buffer) + occurrence_date = broadcast_dt.date().isoformat() + + # Check if this specific day/slot was already processed. + history_key = self._history_key( + self._callsign, + utc_time, + occurrence_date, + ) + if any( + self._history_key( + h.callsign, + h.utc_time, + getattr(h, 'occurrence_date', ''), + ) == history_key + for h in history + ): + continue + + sb = ScheduledBroadcast( station=self._station, callsign=self._callsign, - frequency_khz=self._frequency_khz, - utc_time=utc_time, - duration_min=duration_min, - content=content, - ) + frequency_khz=self._frequency_khz, + utc_time=utc_time, + duration_min=duration_min, + content=content, + occurrence_date=occurrence_date, + ) # Schedule capture timer delay = max(0.0, (capture_start - now).total_seconds()) @@ -371,18 +389,57 @@ class WeFaxScheduler: except ImportError: pass - sb.status = 'capturing' - - # Set up callbacks - if self._progress_callback: - decoder.set_callback(self._progress_callback) - - def _release_device(): - try: - import app as app_module - app_module.release_sdr_device(self._device) - except ImportError: - pass + sb.status = 'capturing' + + def _release_device(): + try: + import app as app_module + app_module.release_sdr_device(self._device) + except ImportError: + pass + + released = False + release_lock = threading.Lock() + + def _release_device_once() -> None: + nonlocal released + with release_lock: + if released: + return + released = True + _release_device() + + def _scheduler_progress_callback(progress: dict) -> None: + """Forward progress updates and release scheduler resources on terminal states.""" + if self._progress_callback: + self._progress_callback(progress) + + if not isinstance(progress, dict) or progress.get('type') != 'wefax_progress': + return + + status = progress.get('status') + if status not in ('complete', 'error', 'stopped'): + return + + if sb.status == 'capturing': + if status == 'complete': + sb.status = 'complete' + self._emit_event({ + 'type': 'schedule_capture_complete', + 'broadcast': sb.to_dict(), + }) + else: + sb.status = 'skipped' + self._emit_event({ + 'type': 'schedule_capture_skipped', + 'broadcast': sb.to_dict(), + 'reason': 'decoder_error', + 'detail': progress.get('message', ''), + }) + + _release_device_once() + + decoder.set_callback(_scheduler_progress_callback) success = decoder.start( frequency_khz=self._frequency_khz, @@ -414,39 +471,53 @@ class WeFaxScheduler: minutes=sb.duration_min, seconds=WEFAX_CAPTURE_BUFFER_SECONDS, ) - stop_delay = max(0.0, (stop_dt - now).total_seconds()) - - if stop_delay > 0: - sb._stop_timer = threading.Timer( - stop_delay, self._stop_capture, args=[sb, _release_device] - ) - sb._stop_timer.daemon = True - sb._stop_timer.start() - else: - sb.status = 'skipped' - _release_device() - self._emit_event({ - 'type': 'schedule_capture_skipped', - 'broadcast': sb.to_dict(), - 'reason': 'start_failed', - 'detail': decoder.last_error or 'unknown error', + stop_delay = max(0.0, (stop_dt - now).total_seconds()) + + if stop_delay > 0: + sb._stop_timer = threading.Timer( + stop_delay, self._stop_capture, args=[sb, _release_device_once] + ) + sb._stop_timer.daemon = True + sb._stop_timer.start() + else: + # If execution was delayed beyond end-of-window, close out + # immediately so SDR allocation is never stranded. + logger.warning( + "Capture window already elapsed for %s at %s UTC; stopping immediately", + sb.content, + sb.utc_time, + ) + self._stop_capture(sb, _release_device_once) + else: + sb.status = 'skipped' + _release_device_once() + self._emit_event({ + 'type': 'schedule_capture_skipped', + 'broadcast': sb.to_dict(), + 'reason': 'start_failed', + 'detail': decoder.last_error or 'unknown error', }) - def _stop_capture( - self, sb: ScheduledBroadcast, release_fn: Callable - ) -> None: - """Stop capture at broadcast end.""" - decoder = get_wefax_decoder() - if decoder.is_running: - decoder.stop() - logger.info("Auto-scheduler stopped capture: %s", sb.content) - - sb.status = 'complete' - release_fn() - self._emit_event({ - 'type': 'schedule_capture_complete', - 'broadcast': sb.to_dict(), - }) + def _stop_capture( + self, sb: ScheduledBroadcast, release_fn: Callable + ) -> None: + """Stop capture at broadcast end.""" + if sb.status != 'capturing': + release_fn() + return + + sb.status = 'complete' + + decoder = get_wefax_decoder() + if decoder.is_running: + decoder.stop() + logger.info("Auto-scheduler stopped capture: %s", sb.content) + + release_fn() + self._emit_event({ + 'type': 'schedule_capture_complete', + 'broadcast': sb.to_dict(), + }) def _emit_event(self, event: dict[str, Any]) -> None: """Emit scheduler event to callback."""