From 4607c358edfa5f4816fe057107e7c5ef8ec5e08f Mon Sep 17 00:00:00 2001 From: James Smith Date: Wed, 18 Mar 2026 17:36:55 +0000 Subject: [PATCH] Add ground station automation with 6-phase implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 - Automated observation engine: - utils/ground_station/scheduler.py: GroundStationScheduler fires at AOS/LOS, claims SDR, manages IQBus lifecycle, emits SSE events - utils/ground_station/observation_profile.py: ObservationProfile dataclass + DB CRUD - routes/ground_station.py: REST API for profiles, scheduler, observations, recordings, rotator; SSE stream; /ws/satellite_waterfall WebSocket - DB tables: observation_profiles, ground_station_observations, ground_station_events, sigmf_recordings (added to utils/database.py init_db) - app.py: ground_station_queue, WebSocket init, scheduler startup in _deferred_init - routes/__init__.py: register ground_station_bp Phase 2 - Doppler correction: - utils/doppler.py: generalized DopplerTracker extracted from sstv_decoder.py; accepts satellite name or raw TLE tuple; thread-safe; update_tle() method - utils/sstv/sstv_decoder.py: replace inline DopplerTracker with import from utils.doppler - Scheduler runs 5s retune loop; calls rotator.point_to() if enabled Phase 3 - IQ recording (SigMF): - utils/sigmf.py: SigMFWriter writes .sigmf-data + .sigmf-meta; disk-free guard (500MB) - utils/ground_station/consumers/sigmf_writer.py: SigMFConsumer wraps SigMFWriter Phase 4 - Multi-decoder IQ broadcast pipeline: - utils/ground_station/iq_bus.py: IQBus single-producer fan-out; IQConsumer Protocol - utils/ground_station/consumers/waterfall.py: CU8→FFT→binary frames - utils/ground_station/consumers/fm_demod.py: CU8→FM demod (numpy)→decoder subprocess - utils/ground_station/consumers/gr_satellites.py: CU8→cf32→gr_satellites (optional) Phase 5 - Live spectrum waterfall: - static/js/modes/ground_station_waterfall.js: /ws/satellite_waterfall canvas renderer - Waterfall panel in satellite dashboard sidebar, auto-shown on iq_bus_started SSE event Phase 6 - Antenna rotator control (optional): - utils/rotator.py: RotatorController TCP client for rotctld (Hamlib line protocol) - Rotator panel in satellite dashboard; silently disabled if rotctld unreachable Also fixes pre-existing test_weather_sat_predict.py breakage: - utils/weather_sat_predict.py: rewritten with self-contained skyfield implementation using find_discrete (matching what committed tests expected); adds _format_utc_iso - tests/test_weather_sat_predict.py: add _MOCK_WEATHER_SATS and @patch decorators for tests that assumed NOAA-18 active (decommissioned Jun 2025, now active=False) Co-Authored-By: Claude Sonnet 4.6 --- app.py | 26 + routes/__init__.py | 2 + routes/ground_station.py | 415 +++++++++ static/js/modes/ground_station_waterfall.js | 233 +++++ templates/satellite_dashboard.html | 323 +++++++ tests/test_weather_sat_predict.py | 23 + utils/database.py | 76 ++ utils/doppler.py | 195 +++++ utils/ground_station/__init__.py | 12 + utils/ground_station/consumers/__init__.py | 1 + utils/ground_station/consumers/fm_demod.py | 219 +++++ .../ground_station/consumers/gr_satellites.py | 154 ++++ .../ground_station/consumers/sigmf_writer.py | 75 ++ utils/ground_station/consumers/waterfall.py | 123 +++ utils/ground_station/iq_bus.py | 307 +++++++ utils/ground_station/observation_profile.py | 140 +++ utils/ground_station/scheduler.py | 794 ++++++++++++++++++ utils/rotator.py | 194 +++++ utils/sigmf.py | 208 +++++ utils/sstv/sstv_decoder.py | 126 +-- utils/weather_sat_predict.py | 244 ++++-- 21 files changed, 3709 insertions(+), 181 deletions(-) create mode 100644 routes/ground_station.py create mode 100644 static/js/modes/ground_station_waterfall.js create mode 100644 utils/doppler.py create mode 100644 utils/ground_station/__init__.py create mode 100644 utils/ground_station/consumers/__init__.py create mode 100644 utils/ground_station/consumers/fm_demod.py create mode 100644 utils/ground_station/consumers/gr_satellites.py create mode 100644 utils/ground_station/consumers/sigmf_writer.py create mode 100644 utils/ground_station/consumers/waterfall.py create mode 100644 utils/ground_station/iq_bus.py create mode 100644 utils/ground_station/observation_profile.py create mode 100644 utils/ground_station/scheduler.py create mode 100644 utils/rotator.py create mode 100644 utils/sigmf.py diff --git a/app.py b/app.py index 5ca43fe..7a8f61d 100644 --- a/app.py +++ b/app.py @@ -274,6 +274,9 @@ dsc_lock = threading.Lock() tscm_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) tscm_lock = threading.Lock() +# Ground Station automation +ground_station_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) + # SubGHz Transceiver (HackRF) subghz_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) subghz_lock = threading.Lock() @@ -1149,6 +1152,13 @@ def _init_app() -> None: except ImportError: pass + # Initialize WebSocket for ground station live waterfall + try: + from routes.ground_station import init_ground_station_websocket + init_ground_station_websocket(app) + except ImportError: + pass + # Defer heavy/network operations so the worker can serve requests immediately import threading @@ -1196,6 +1206,22 @@ def _init_app() -> None: except Exception as e: logger.warning(f"SatNOGS prefetch failed: {e}") + # Wire ground station scheduler event → SSE queue + try: + import app as _self + from utils.ground_station.scheduler import get_ground_station_scheduler + gs_scheduler = get_ground_station_scheduler() + + def _gs_event_to_sse(event: dict) -> None: + try: + _self.ground_station_queue.put_nowait(event) + except Exception: + pass + + gs_scheduler.set_event_callback(_gs_event_to_sse) + except Exception as e: + logger.warning(f"Ground station scheduler init failed: {e}") + threading.Thread(target=_deferred_init, daemon=True).start() diff --git a/routes/__init__.py b/routes/__init__.py index 7007cc2..0cdc50f 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -25,6 +25,7 @@ def register_blueprints(app): from .meteor_websocket import meteor_bp from .morse import morse_bp from .offline import offline_bp + from .ground_station import ground_station_bp from .ook import ook_bp from .pager import pager_bp from .radiosonde import radiosonde_bp @@ -89,6 +90,7 @@ def register_blueprints(app): app.register_blueprint(radiosonde_bp) # Radiosonde weather balloon tracking app.register_blueprint(system_bp) # System health monitoring app.register_blueprint(ook_bp) # Generic OOK signal decoder + app.register_blueprint(ground_station_bp) # Ground station automation # Exempt all API blueprints from CSRF (they use JSON, not form tokens) if _csrf: diff --git a/routes/ground_station.py b/routes/ground_station.py new file mode 100644 index 0000000..6b77bfb --- /dev/null +++ b/routes/ground_station.py @@ -0,0 +1,415 @@ +"""Ground Station REST API + SSE + WebSocket endpoints. + +Phases implemented here: + 1 — Profile CRUD, scheduler control, observation history, SSE stream + 3 — SigMF recording browser (list / download / delete) + 5 — /ws/satellite_waterfall WebSocket + 6 — Rotator config / status / point / park endpoints +""" + +from __future__ import annotations + +import json +import queue +import threading +from pathlib import Path + +from flask import Blueprint, Response, jsonify, request, send_file + +from utils.logging import get_logger +from utils.sse import sse_stream_fanout + +logger = get_logger('intercept.ground_station.routes') + +ground_station_bp = Blueprint('ground_station', __name__, url_prefix='/ground_station') + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _get_scheduler(): + from utils.ground_station.scheduler import get_ground_station_scheduler + return get_ground_station_scheduler() + + +def _get_queue(): + import app as _app + return getattr(_app, 'ground_station_queue', None) or queue.Queue() + + +# --------------------------------------------------------------------------- +# Phase 1 — Observation Profiles +# --------------------------------------------------------------------------- + + +@ground_station_bp.route('/profiles', methods=['GET']) +def list_profiles(): + from utils.ground_station.observation_profile import list_profiles as _list + return jsonify([p.to_dict() for p in _list()]) + + +@ground_station_bp.route('/profiles/', methods=['GET']) +def get_profile(norad_id: int): + from utils.ground_station.observation_profile import get_profile as _get + p = _get(norad_id) + if not p: + return jsonify({'error': f'No profile for NORAD {norad_id}'}), 404 + return jsonify(p.to_dict()) + + +@ground_station_bp.route('/profiles', methods=['POST']) +def create_profile(): + data = request.get_json(force=True) or {} + try: + _validate_profile(data) + except ValueError as e: + return jsonify({'error': str(e)}), 400 + + from utils.ground_station.observation_profile import ObservationProfile, save_profile + profile = ObservationProfile( + norad_id=int(data['norad_id']), + name=str(data['name']), + frequency_mhz=float(data['frequency_mhz']), + decoder_type=str(data.get('decoder_type', 'fm')), + gain=float(data.get('gain', 40.0)), + bandwidth_hz=int(data.get('bandwidth_hz', 200_000)), + min_elevation=float(data.get('min_elevation', 10.0)), + enabled=bool(data.get('enabled', True)), + record_iq=bool(data.get('record_iq', False)), + iq_sample_rate=int(data.get('iq_sample_rate', 2_400_000)), + ) + saved = save_profile(profile) + return jsonify(saved.to_dict()), 201 + + +@ground_station_bp.route('/profiles/', methods=['PUT']) +def update_profile(norad_id: int): + data = request.get_json(force=True) or {} + from utils.ground_station.observation_profile import get_profile as _get, save_profile + existing = _get(norad_id) + if not existing: + return jsonify({'error': f'No profile for NORAD {norad_id}'}), 404 + + # Apply updates + for field, cast in [ + ('name', str), ('frequency_mhz', float), ('decoder_type', str), + ('gain', float), ('bandwidth_hz', int), ('min_elevation', float), + ]: + if field in data: + setattr(existing, field, cast(data[field])) + for field in ('enabled', 'record_iq'): + if field in data: + setattr(existing, field, bool(data[field])) + if 'iq_sample_rate' in data: + existing.iq_sample_rate = int(data['iq_sample_rate']) + + saved = save_profile(existing) + return jsonify(saved.to_dict()) + + +@ground_station_bp.route('/profiles/', methods=['DELETE']) +def delete_profile(norad_id: int): + from utils.ground_station.observation_profile import delete_profile as _del + ok = _del(norad_id) + if not ok: + return jsonify({'error': f'No profile for NORAD {norad_id}'}), 404 + return jsonify({'status': 'deleted', 'norad_id': norad_id}) + + +# --------------------------------------------------------------------------- +# Phase 1 — Scheduler control +# --------------------------------------------------------------------------- + + +@ground_station_bp.route('/scheduler/status', methods=['GET']) +def scheduler_status(): + return jsonify(_get_scheduler().get_status()) + + +@ground_station_bp.route('/scheduler/enable', methods=['POST']) +def scheduler_enable(): + data = request.get_json(force=True) or {} + try: + lat = float(data.get('lat', 0.0)) + lon = float(data.get('lon', 0.0)) + device = int(data.get('device', 0)) + sdr_type = str(data.get('sdr_type', 'rtlsdr')) + except (TypeError, ValueError) as e: + return jsonify({'error': str(e)}), 400 + + status = _get_scheduler().enable(lat=lat, lon=lon, device=device, sdr_type=sdr_type) + return jsonify(status) + + +@ground_station_bp.route('/scheduler/disable', methods=['POST']) +def scheduler_disable(): + return jsonify(_get_scheduler().disable()) + + +@ground_station_bp.route('/scheduler/observations', methods=['GET']) +def get_observations(): + return jsonify(_get_scheduler().get_scheduled_observations()) + + +@ground_station_bp.route('/scheduler/trigger/', methods=['POST']) +def trigger_manual(norad_id: int): + ok, msg = _get_scheduler().trigger_manual(norad_id) + if not ok: + return jsonify({'error': msg}), 400 + return jsonify({'status': 'started', 'message': msg}) + + +@ground_station_bp.route('/scheduler/stop', methods=['POST']) +def stop_active(): + return jsonify(_get_scheduler().stop_active()) + + +# --------------------------------------------------------------------------- +# Phase 1 — Observation history (from DB) +# --------------------------------------------------------------------------- + + +@ground_station_bp.route('/observations', methods=['GET']) +def observation_history(): + limit = min(int(request.args.get('limit', 50)), 200) + try: + from utils.database import get_db + with get_db() as conn: + rows = conn.execute( + '''SELECT * FROM ground_station_observations + ORDER BY created_at DESC LIMIT ?''', + (limit,), + ).fetchall() + return jsonify([dict(r) for r in rows]) + except Exception as e: + logger.error(f"Failed to fetch observation history: {e}") + return jsonify([]) + + +# --------------------------------------------------------------------------- +# Phase 1 — SSE stream +# --------------------------------------------------------------------------- + + +@ground_station_bp.route('/stream') +def sse_stream(): + gs_queue = _get_queue() + return Response( + sse_stream_fanout(gs_queue, 'ground_station'), + mimetype='text/event-stream', + headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no', + }, + ) + + +# --------------------------------------------------------------------------- +# Phase 3 — SigMF recording browser +# --------------------------------------------------------------------------- + + +@ground_station_bp.route('/recordings', methods=['GET']) +def list_recordings(): + try: + from utils.database import get_db + with get_db() as conn: + rows = conn.execute( + 'SELECT * FROM sigmf_recordings ORDER BY created_at DESC LIMIT 100' + ).fetchall() + return jsonify([dict(r) for r in rows]) + except Exception as e: + logger.error(f"Failed to fetch recordings: {e}") + return jsonify([]) + + +@ground_station_bp.route('/recordings/', methods=['GET']) +def get_recording(rec_id: int): + try: + from utils.database import get_db + with get_db() as conn: + row = conn.execute( + 'SELECT * FROM sigmf_recordings WHERE id=?', (rec_id,) + ).fetchone() + if not row: + return jsonify({'error': 'Not found'}), 404 + return jsonify(dict(row)) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@ground_station_bp.route('/recordings/', methods=['DELETE']) +def delete_recording(rec_id: int): + try: + from utils.database import get_db + with get_db() as conn: + row = conn.execute( + 'SELECT sigmf_data_path, sigmf_meta_path FROM sigmf_recordings WHERE id=?', + (rec_id,), + ).fetchone() + if not row: + return jsonify({'error': 'Not found'}), 404 + # Remove files + for path_col in ('sigmf_data_path', 'sigmf_meta_path'): + p = Path(row[path_col]) + if p.exists(): + p.unlink(missing_ok=True) + conn.execute('DELETE FROM sigmf_recordings WHERE id=?', (rec_id,)) + return jsonify({'status': 'deleted', 'id': rec_id}) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@ground_station_bp.route('/recordings//download/') +def download_recording(rec_id: int, file_type: str): + if file_type not in ('data', 'meta'): + return jsonify({'error': 'file_type must be data or meta'}), 400 + try: + from utils.database import get_db + with get_db() as conn: + row = conn.execute( + 'SELECT sigmf_data_path, sigmf_meta_path FROM sigmf_recordings WHERE id=?', + (rec_id,), + ).fetchone() + if not row: + return jsonify({'error': 'Not found'}), 404 + + col = 'sigmf_data_path' if file_type == 'data' else 'sigmf_meta_path' + p = Path(row[col]) + if not p.exists(): + return jsonify({'error': 'File not found on disk'}), 404 + + mimetype = 'application/octet-stream' if file_type == 'data' else 'application/json' + return send_file(p, mimetype=mimetype, as_attachment=True, download_name=p.name) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +# --------------------------------------------------------------------------- +# Phase 5 — Live waterfall WebSocket +# --------------------------------------------------------------------------- + + +def init_ground_station_websocket(app) -> None: + """Register the /ws/satellite_waterfall WebSocket endpoint.""" + try: + from flask_sock import Sock + except ImportError: + logger.warning("flask-sock not installed — satellite waterfall WebSocket disabled") + return + + sock = Sock(app) + + @sock.route('/ws/satellite_waterfall') + def satellite_waterfall_ws(ws): + """Stream binary waterfall frames from the active ground station IQ bus.""" + import app as _app_mod + scheduler = _get_scheduler() + wf_queue = scheduler.waterfall_queue + + from utils.sse import subscribe_fanout_queue + sub_queue, unsubscribe = subscribe_fanout_queue( + source_queue=wf_queue, + channel_key='gs_waterfall', + subscriber_queue_size=120, + ) + + try: + while True: + try: + frame = sub_queue.get(timeout=1.0) + try: + ws.send(frame) + except Exception: + break + except queue.Empty: + if not ws.connected: + break + finally: + unsubscribe() + + +# --------------------------------------------------------------------------- +# Phase 6 — Rotator +# --------------------------------------------------------------------------- + + +@ground_station_bp.route('/rotator/status', methods=['GET']) +def rotator_status(): + from utils.rotator import get_rotator + return jsonify(get_rotator().get_status()) + + +@ground_station_bp.route('/rotator/config', methods=['POST']) +def rotator_config(): + data = request.get_json(force=True) or {} + host = str(data.get('host', '127.0.0.1')) + port = int(data.get('port', 4533)) + from utils.rotator import get_rotator + ok = get_rotator().connect(host, port) + if not ok: + return jsonify({'error': f'Could not connect to rotctld at {host}:{port}'}), 503 + return jsonify(get_rotator().get_status()) + + +@ground_station_bp.route('/rotator/point', methods=['POST']) +def rotator_point(): + data = request.get_json(force=True) or {} + try: + az = float(data['az']) + el = float(data['el']) + except (KeyError, TypeError, ValueError) as e: + return jsonify({'error': f'az and el required: {e}'}), 400 + from utils.rotator import get_rotator + ok = get_rotator().point_to(az, el) + if not ok: + return jsonify({'error': 'Rotator command failed'}), 503 + return jsonify({'status': 'ok', 'az': az, 'el': el}) + + +@ground_station_bp.route('/rotator/park', methods=['POST']) +def rotator_park(): + from utils.rotator import get_rotator + ok = get_rotator().park() + if not ok: + return jsonify({'error': 'Rotator park failed'}), 503 + return jsonify({'status': 'parked'}) + + +@ground_station_bp.route('/rotator/disconnect', methods=['POST']) +def rotator_disconnect(): + from utils.rotator import get_rotator + get_rotator().disconnect() + return jsonify({'status': 'disconnected'}) + + +# --------------------------------------------------------------------------- +# Input validation +# --------------------------------------------------------------------------- + + +def _validate_profile(data: dict) -> None: + if 'norad_id' not in data: + raise ValueError("norad_id is required") + if 'name' not in data: + raise ValueError("name is required") + if 'frequency_mhz' not in data: + raise ValueError("frequency_mhz is required") + try: + norad_id = int(data['norad_id']) + if norad_id <= 0: + raise ValueError("norad_id must be positive") + except (TypeError, ValueError): + raise ValueError("norad_id must be a positive integer") + try: + freq = float(data['frequency_mhz']) + if not (0.1 <= freq <= 3000.0): + raise ValueError("frequency_mhz must be between 0.1 and 3000") + except (TypeError, ValueError): + raise ValueError("frequency_mhz must be a number between 0.1 and 3000") + valid_decoders = {'fm', 'afsk', 'gmsk', 'bpsk', 'iq_only'} + dt = str(data.get('decoder_type', 'fm')) + if dt not in valid_decoders: + raise ValueError(f"decoder_type must be one of: {', '.join(sorted(valid_decoders))}") diff --git a/static/js/modes/ground_station_waterfall.js b/static/js/modes/ground_station_waterfall.js new file mode 100644 index 0000000..01b7568 --- /dev/null +++ b/static/js/modes/ground_station_waterfall.js @@ -0,0 +1,233 @@ +/** + * Ground Station Live Waterfall — Phase 5 + * + * Subscribes to /ws/satellite_waterfall, receives binary frames in the same + * wire format as the main listening-post waterfall, and renders them onto the + * element in satellite_dashboard.html. + * + * Wire frame format (matches utils/waterfall_fft.build_binary_frame): + * [uint8 msg_type=0x01] + * [float32 start_freq_mhz] + * [float32 end_freq_mhz] + * [uint16 bin_count] + * [uint8[] bins] — 0=noise floor, 255=strongest signal + */ + +(function () { + 'use strict'; + + const CANVAS_ID = 'gs-waterfall'; + const ROW_HEIGHT = 2; // px per waterfall row + const SCROLL_STEP = ROW_HEIGHT; + + let _ws = null; + let _canvas = null; + let _ctx = null; + let _offscreen = null; // offscreen ImageData buffer + let _reconnectTimer = null; + let _centerMhz = 0; + let _spanMhz = 0; + let _connected = false; + + // ----------------------------------------------------------------------- + // Colour palette — 256-entry RGB array (matches listening-post waterfall) + // ----------------------------------------------------------------------- + const _palette = _buildPalette(); + + function _buildPalette() { + const p = new Uint8Array(256 * 3); + for (let i = 0; i < 256; i++) { + let r, g, b; + if (i < 64) { + // black → dark blue + r = 0; g = 0; b = Math.round(i * 2); + } else if (i < 128) { + // dark blue → cyan + const t = (i - 64) / 64; + r = 0; g = Math.round(t * 200); b = Math.round(128 + t * 127); + } else if (i < 192) { + // cyan → yellow + const t = (i - 128) / 64; + r = Math.round(t * 255); g = 200; b = Math.round(255 - t * 255); + } else { + // yellow → white + const t = (i - 192) / 64; + r = 255; g = 200; b = Math.round(t * 255); + } + p[i * 3] = r; p[i * 3 + 1] = g; p[i * 3 + 2] = b; + } + return p; + } + + // ----------------------------------------------------------------------- + // Public API + // ----------------------------------------------------------------------- + + window.GroundStationWaterfall = { + init, + connect, + disconnect, + isConnected: () => _connected, + setCenterFreq: (mhz, span) => { _centerMhz = mhz; _spanMhz = span; }, + }; + + function init() { + _canvas = document.getElementById(CANVAS_ID); + if (!_canvas) return; + _ctx = _canvas.getContext('2d'); + _resizeCanvas(); + window.addEventListener('resize', _resizeCanvas); + _drawPlaceholder(); + } + + function connect() { + if (_ws && (_ws.readyState === WebSocket.CONNECTING || _ws.readyState === WebSocket.OPEN)) { + return; + } + if (_reconnectTimer) { + clearTimeout(_reconnectTimer); + _reconnectTimer = null; + } + + const proto = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = `${proto}//${location.host}/ws/satellite_waterfall`; + + try { + _ws = new WebSocket(url); + _ws.binaryType = 'arraybuffer'; + + _ws.onopen = () => { + _connected = true; + _updateStatus('LIVE'); + console.log('[GS Waterfall] WebSocket connected'); + }; + + _ws.onmessage = (evt) => { + if (evt.data instanceof ArrayBuffer) { + _handleFrame(evt.data); + } + }; + + _ws.onclose = () => { + _connected = false; + _updateStatus('DISCONNECTED'); + _scheduleReconnect(); + }; + + _ws.onerror = (e) => { + console.warn('[GS Waterfall] WebSocket error', e); + }; + } catch (e) { + console.error('[GS Waterfall] Failed to create WebSocket', e); + _scheduleReconnect(); + } + } + + function disconnect() { + if (_reconnectTimer) { clearTimeout(_reconnectTimer); _reconnectTimer = null; } + if (_ws) { _ws.close(); _ws = null; } + _connected = false; + _updateStatus('STOPPED'); + _drawPlaceholder(); + } + + // ----------------------------------------------------------------------- + // Frame rendering + // ----------------------------------------------------------------------- + + function _handleFrame(buf) { + const view = new DataView(buf); + if (buf.byteLength < 11) return; + + const msgType = view.getUint8(0); + if (msgType !== 0x01) return; + + // const startFreq = view.getFloat32(1, true); // little-endian + // const endFreq = view.getFloat32(5, true); + const binCount = view.getUint16(9, true); + if (buf.byteLength < 11 + binCount) return; + + const bins = new Uint8Array(buf, 11, binCount); + + if (!_canvas || !_ctx) return; + + const W = _canvas.width; + const H = _canvas.height; + + // Scroll existing image up by ROW_HEIGHT pixels + if (!_offscreen || _offscreen.width !== W || _offscreen.height !== H) { + _offscreen = _ctx.getImageData(0, 0, W, H); + } else { + _offscreen = _ctx.getImageData(0, 0, W, H); + } + + // Shift rows up by ROW_HEIGHT + const data = _offscreen.data; + const rowBytes = W * 4; + data.copyWithin(0, SCROLL_STEP * rowBytes); + + // Write new row(s) at the bottom + const bottom = H - ROW_HEIGHT; + for (let row = 0; row < ROW_HEIGHT; row++) { + const rowStart = (bottom + row) * rowBytes; + for (let x = 0; x < W; x++) { + const binIdx = Math.floor((x / W) * binCount); + const val = bins[Math.min(binIdx, binCount - 1)]; + const pi = val * 3; + const di = rowStart + x * 4; + data[di] = _palette[pi]; + data[di + 1] = _palette[pi + 1]; + data[di + 2] = _palette[pi + 2]; + data[di + 3] = 255; + } + } + + _ctx.putImageData(_offscreen, 0, 0); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + function _resizeCanvas() { + if (!_canvas) return; + const container = _canvas.parentElement; + if (container) { + _canvas.width = container.clientWidth || 400; + _canvas.height = container.clientHeight || 200; + } + _offscreen = null; + _drawPlaceholder(); + } + + function _drawPlaceholder() { + if (!_ctx || !_canvas) return; + _ctx.fillStyle = '#000a14'; + _ctx.fillRect(0, 0, _canvas.width, _canvas.height); + _ctx.fillStyle = 'rgba(0,212,255,0.3)'; + _ctx.font = '12px monospace'; + _ctx.textAlign = 'center'; + _ctx.fillText('AWAITING SATELLITE PASS', _canvas.width / 2, _canvas.height / 2); + _ctx.textAlign = 'left'; + } + + function _updateStatus(text) { + const el = document.getElementById('gsWaterfallStatus'); + if (el) el.textContent = text; + } + + function _scheduleReconnect(delayMs = 5000) { + if (_reconnectTimer) return; + _reconnectTimer = setTimeout(() => { + _reconnectTimer = null; + connect(); + }, delayMs); + } + + // Auto-init when DOM is ready + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', init); + } else { + init(); + } +})(); diff --git a/templates/satellite_dashboard.html b/templates/satellite_dashboard.html index 0ed8dbf..3e1a77a 100644 --- a/templates/satellite_dashboard.html +++ b/templates/satellite_dashboard.html @@ -221,6 +221,57 @@ + + +
+
+ GROUND STATION +
+
+
+ +
+ Scheduler + IDLE +
+ + + +
+ + + +
+ +
+ + + + + + +
+
@@ -349,6 +400,42 @@ font-size: 10px; margin-top: 1px; } + + /* Ground Station panel */ + .gs-panel { margin-top: 10px; } + .gs-status-row { + display: flex; + justify-content: space-between; + align-items: center; + padding: 3px 0; + border-bottom: 1px solid rgba(0,212,255,0.06); + font-size: 11px; + } + .gs-obs-item { + display: flex; + justify-content: space-between; + align-items: center; + padding: 4px 0; + font-size: 10px; + border-bottom: 1px solid rgba(0,212,255,0.06); + font-family: var(--font-mono); + } + .gs-obs-item .sat-name { color: var(--text-primary); } + .gs-obs-item .obs-time { color: var(--text-secondary); font-size: 9px; } + .gs-recording-item { + display: flex; + justify-content: space-between; + align-items: center; + padding: 3px 0; + font-size: 10px; + border-bottom: 1px solid rgba(0,212,255,0.06); + } + .gs-recording-item a { + color: var(--accent-cyan); + text-decoration: none; + font-family: var(--font-mono); + } + .gs-recording-item a:hover { text-decoration: underline; } + + + diff --git a/tests/test_weather_sat_predict.py b/tests/test_weather_sat_predict.py index dfd36e9..c42d8e8 100644 --- a/tests/test_weather_sat_predict.py +++ b/tests/test_weather_sat_predict.py @@ -13,6 +13,20 @@ import pytest from utils.weather_sat_predict import _format_utc_iso, predict_passes +# Controlled single-satellite config used by tests that need exactly one active satellite. +# NOAA-18 was decommissioned Jun 2025 and is inactive in the real WEATHER_SATELLITES, +# so tests that assert on satellite-specific fields patch the module-level name. +_MOCK_WEATHER_SATS = { + 'NOAA-18': { + 'name': 'NOAA 18', + 'frequency': 137.9125, + 'mode': 'APT', + 'pipeline': 'noaa_apt', + 'tle_key': 'NOAA-18', + 'active': True, + } +} + class TestPredictPasses: """Tests for predict_passes() function.""" @@ -31,6 +45,7 @@ class TestPredictPasses: assert passes == [] + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -96,6 +111,7 @@ class TestPredictPasses: assert 'duration' in pass_data assert 'quality' in pass_data + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -150,6 +166,7 @@ class TestPredictPasses: assert len(passes) == 0 + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -207,6 +224,7 @@ class TestPredictPasses: assert 'trajectory' in passes[0] assert len(passes[0]['trajectory']) == 30 + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -281,6 +299,7 @@ class TestPredictPasses: assert 'groundTrack' in passes[0] assert len(passes[0]['groundTrack']) == 60 + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -336,6 +355,7 @@ class TestPredictPasses: assert passes[0]['quality'] == 'excellent' assert passes[0]['maxEl'] >= 60 + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -391,6 +411,7 @@ class TestPredictPasses: assert passes[0]['quality'] == 'good' assert 30 <= passes[0]['maxEl'] < 60 + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -530,6 +551,7 @@ class TestPredictPasses: predict_passes(lat=51.5, lon=-0.1, hours=24, min_elevation=15) # Should not raise + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') @@ -605,6 +627,7 @@ class TestPredictPasses: class TestPassDataStructure: """Tests for pass data structure.""" + @patch('utils.weather_sat_predict.WEATHER_SATELLITES', _MOCK_WEATHER_SATS) @patch('utils.weather_sat_predict.load') @patch('utils.weather_sat_predict.TLE_SATELLITES') @patch('utils.weather_sat_predict.wgs84') diff --git a/utils/database.py b/utils/database.py index 9790efd..5101811 100644 --- a/utils/database.py +++ b/utils/database.py @@ -636,6 +636,82 @@ def init_db() -> None: VALUES ('40069', 'METEOR-M2', NULL, NULL, 1, 1) ''') + # ===================================================================== + # Ground Station Tables (automated observations, IQ recordings) + # ===================================================================== + + # Observation profiles — per-satellite capture configuration + conn.execute(''' + CREATE TABLE IF NOT EXISTS observation_profiles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + norad_id INTEGER UNIQUE NOT NULL, + name TEXT NOT NULL, + frequency_mhz REAL NOT NULL, + decoder_type TEXT NOT NULL DEFAULT 'fm', + gain REAL DEFAULT 40.0, + bandwidth_hz INTEGER DEFAULT 200000, + min_elevation REAL DEFAULT 10.0, + enabled BOOLEAN DEFAULT 1, + record_iq BOOLEAN DEFAULT 0, + iq_sample_rate INTEGER DEFAULT 2400000, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # Observation history — one row per captured pass + conn.execute(''' + CREATE TABLE IF NOT EXISTS ground_station_observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_id INTEGER, + norad_id INTEGER NOT NULL, + satellite TEXT NOT NULL, + aos_time TEXT, + los_time TEXT, + status TEXT DEFAULT 'scheduled', + output_path TEXT, + packets_decoded INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (profile_id) REFERENCES observation_profiles(id) ON DELETE SET NULL + ) + ''') + + # Per-observation events (packets decoded, Doppler updates, etc.) + conn.execute(''' + CREATE TABLE IF NOT EXISTS ground_station_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + observation_id INTEGER, + event_type TEXT NOT NULL, + payload_json TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (observation_id) REFERENCES ground_station_observations(id) ON DELETE CASCADE + ) + ''') + + # SigMF recordings — one row per IQ recording file pair + conn.execute(''' + CREATE TABLE IF NOT EXISTS sigmf_recordings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + observation_id INTEGER, + sigmf_data_path TEXT NOT NULL, + sigmf_meta_path TEXT NOT NULL, + size_bytes INTEGER DEFAULT 0, + sample_rate INTEGER, + center_freq_hz INTEGER, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (observation_id) REFERENCES ground_station_observations(id) ON DELETE SET NULL + ) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_gs_observations_norad + ON ground_station_observations(norad_id, created_at) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_gs_events_observation + ON ground_station_events(observation_id, timestamp) + ''') + logger.info("Database initialized successfully") diff --git a/utils/doppler.py b/utils/doppler.py new file mode 100644 index 0000000..149c8de --- /dev/null +++ b/utils/doppler.py @@ -0,0 +1,195 @@ +"""Generalised Doppler shift calculator for satellite observations. + +Extracted from utils/sstv/sstv_decoder.py and generalised to accept any +satellite by name (looked up in the live TLE cache) or by raw TLE tuple. + +The sstv_decoder module imports DopplerTracker and DopplerInfo from here. +""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone + +from utils.logging import get_logger + +logger = get_logger('intercept.doppler') + +# Speed of light in m/s +SPEED_OF_LIGHT = 299_792_458.0 + +# Default Hz threshold before triggering a retune +DEFAULT_RETUNE_THRESHOLD_HZ = 500 + + +@dataclass +class DopplerInfo: + """Doppler shift information for a satellite observation.""" + + frequency_hz: float + shift_hz: float + range_rate_km_s: float + elevation: float + azimuth: float + timestamp: datetime + + def to_dict(self) -> dict: + return { + 'frequency_hz': self.frequency_hz, + 'shift_hz': round(self.shift_hz, 1), + 'range_rate_km_s': round(self.range_rate_km_s, 3), + 'elevation': round(self.elevation, 1), + 'azimuth': round(self.azimuth, 1), + 'timestamp': self.timestamp.isoformat(), + } + + +class DopplerTracker: + """Real-time Doppler shift calculator for satellite tracking. + + Accepts a satellite by name (looked up in the live TLE cache, falling + back to static data) **or** a raw TLE tuple ``(name, line1, line2)`` + passed via the constructor or via :meth:`update_tle`. + """ + + def __init__( + self, + satellite_name: str = 'ISS', + tle_data: tuple[str, str, str] | None = None, + ): + self._satellite_name = satellite_name + self._tle_data = tle_data + self._observer_lat: float | None = None + self._observer_lon: float | None = None + self._satellite = None + self._observer = None + self._ts = None + self._enabled = False + self._lock = threading.Lock() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def configure(self, latitude: float, longitude: float) -> bool: + """Configure the tracker with an observer location. + + Resolves TLE data, builds the skyfield objects, and marks the + tracker enabled. Returns True on success. + """ + try: + from skyfield.api import EarthSatellite, load, wgs84 + except ImportError: + logger.warning("skyfield not available — Doppler tracking disabled") + return False + + tle = self._resolve_tle() + if tle is None: + logger.error(f"No TLE data for satellite: {self._satellite_name}") + return False + + try: + ts = load.timescale() + satellite = EarthSatellite(tle[1], tle[2], tle[0], ts) + observer = wgs84.latlon(latitude, longitude) + except Exception as e: + logger.error(f"Failed to configure DopplerTracker: {e}") + return False + + with self._lock: + self._ts = ts + self._satellite = satellite + self._observer = observer + self._observer_lat = latitude + self._observer_lon = longitude + self._enabled = True + + logger.info( + f"DopplerTracker configured for {self._satellite_name} " + f"at ({latitude}, {longitude})" + ) + return True + + def update_tle(self, tle_data: tuple[str, str, str]) -> bool: + """Update TLE data and re-configure if already enabled.""" + self._tle_data = tle_data + if ( + self._enabled + and self._observer_lat is not None + and self._observer_lon is not None + ): + return self.configure(self._observer_lat, self._observer_lon) + return True + + @property + def is_enabled(self) -> bool: + return self._enabled + + def calculate(self, nominal_freq_mhz: float) -> DopplerInfo | None: + """Calculate the Doppler-corrected receive frequency. + + Returns a :class:`DopplerInfo` or *None* if the tracker is not + enabled or the calculation fails. + """ + with self._lock: + if not self._enabled or self._satellite is None or self._observer is None: + return None + ts = self._ts + satellite = self._satellite + observer = self._observer + + try: + t = ts.now() + difference = satellite - observer + topocentric = difference.at(t) + alt, az, distance = topocentric.altaz() + + dt_seconds = 1.0 + t_future = ts.utc(t.utc_datetime() + timedelta(seconds=dt_seconds)) + topocentric_future = difference.at(t_future) + _, _, distance_future = topocentric_future.altaz() + + range_rate_km_s = (distance_future.km - distance.km) / dt_seconds + nominal_freq_hz = nominal_freq_mhz * 1_000_000 + doppler_factor = 1.0 - (range_rate_km_s * 1000.0 / SPEED_OF_LIGHT) + corrected_freq_hz = nominal_freq_hz * doppler_factor + shift_hz = corrected_freq_hz - nominal_freq_hz + + return DopplerInfo( + frequency_hz=corrected_freq_hz, + shift_hz=shift_hz, + range_rate_km_s=range_rate_km_s, + elevation=alt.degrees, + azimuth=az.degrees, + timestamp=datetime.now(timezone.utc), + ) + except Exception as e: + logger.error(f"Doppler calculation failed: {e}") + return None + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _resolve_tle(self) -> tuple[str, str, str] | None: + """Return the best available TLE tuple.""" + if self._tle_data: + return self._tle_data + + # Try the live TLE cache maintained by routes/satellite.py + try: + from routes.satellite import _tle_cache # type: ignore[import] + if _tle_cache: + tle = _tle_cache.get(self._satellite_name) + if tle: + return tle + except (ImportError, AttributeError): + pass + + # Fall back to static bundled data + try: + from data.satellites import TLE_SATELLITES + return TLE_SATELLITES.get(self._satellite_name) + except ImportError: + return None diff --git a/utils/ground_station/__init__.py b/utils/ground_station/__init__.py new file mode 100644 index 0000000..d33bb1f --- /dev/null +++ b/utils/ground_station/__init__.py @@ -0,0 +1,12 @@ +"""Ground station automation subpackage. + +Provides unattended satellite observation, Doppler correction, IQ recording +(SigMF), parallel multi-decoder pipelines, live spectrum, and optional +antenna rotator control. + +Public API:: + + from utils.ground_station.scheduler import get_ground_station_scheduler + from utils.ground_station.observation_profile import ObservationProfile + from utils.ground_station.iq_bus import IQBus +""" diff --git a/utils/ground_station/consumers/__init__.py b/utils/ground_station/consumers/__init__.py new file mode 100644 index 0000000..e5e2369 --- /dev/null +++ b/utils/ground_station/consumers/__init__.py @@ -0,0 +1 @@ +"""IQ bus consumer implementations.""" diff --git a/utils/ground_station/consumers/fm_demod.py b/utils/ground_station/consumers/fm_demod.py new file mode 100644 index 0000000..46f3f52 --- /dev/null +++ b/utils/ground_station/consumers/fm_demod.py @@ -0,0 +1,219 @@ +"""FMDemodConsumer — demodulates FM from CU8 IQ and pipes PCM to a decoder. + +Performs FM (or AM/USB/LSB) demodulation in-process using numpy — the +same algorithm as the listening-post waterfall monitor. The resulting +int16 PCM is written to the stdin of a configurable decoder subprocess +(e.g. direwolf for AX.25 AFSK or multimon-ng for GMSK/POCSAG). + +Decoded lines from the subprocess stdout are forwarded to an optional +``on_decoded`` callback. +""" + +from __future__ import annotations + +import subprocess +import threading +from typing import Callable + +import numpy as np + +from utils.logging import get_logger +from utils.process import register_process, safe_terminate, unregister_process +from utils.waterfall_fft import cu8_to_complex + +logger = get_logger('intercept.ground_station.fm_demod') + +AUDIO_RATE = 48_000 # Hz — standard rate for direwolf / multimon-ng + + +class FMDemodConsumer: + """CU8 IQ → FM demodulation → int16 PCM → decoder subprocess stdin.""" + + def __init__( + self, + decoder_cmd: list[str], + *, + modulation: str = 'fm', + on_decoded: Callable[[str], None] | None = None, + ): + """ + Args: + decoder_cmd: Decoder command + args, e.g. + ``['direwolf', '-r', '48000', '-']`` or + ``['multimon-ng', '-t', 'raw', '-a', 'AFSK1200', '-']``. + modulation: ``'fm'``, ``'am'``, ``'usb'``, ``'lsb'``. + on_decoded: Callback invoked with each decoded line from stdout. + """ + self._decoder_cmd = decoder_cmd + self._modulation = modulation.lower() + self._on_decoded = on_decoded + self._proc: subprocess.Popen | None = None + self._stdout_thread: threading.Thread | None = None + self._center_mhz = 0.0 + self._sample_rate = 0 + self._rotator_phase = 0.0 + + # ------------------------------------------------------------------ + # IQConsumer protocol + # ------------------------------------------------------------------ + + def on_start( + self, + center_mhz: float, + sample_rate: int, + *, + start_freq_mhz: float, + end_freq_mhz: float, + ) -> None: + self._center_mhz = center_mhz + self._sample_rate = sample_rate + self._rotator_phase = 0.0 + self._start_proc() + + def on_chunk(self, raw: bytes) -> None: + if self._proc is None or self._proc.poll() is not None: + return + try: + pcm, self._rotator_phase = _demodulate( + raw, + sample_rate=self._sample_rate, + center_mhz=self._center_mhz, + monitor_freq_mhz=self._center_mhz, # decode on-center + modulation=self._modulation, + rotator_phase=self._rotator_phase, + ) + if pcm and self._proc.stdin: + self._proc.stdin.write(pcm) + self._proc.stdin.flush() + except (BrokenPipeError, OSError): + pass # decoder exited + except Exception as e: + logger.debug(f"FMDemodConsumer on_chunk error: {e}") + + def on_stop(self) -> None: + if self._proc: + safe_terminate(self._proc) + unregister_process(self._proc) + self._proc = None + if self._stdout_thread and self._stdout_thread.is_alive(): + self._stdout_thread.join(timeout=2) + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _start_proc(self) -> None: + import shutil + if not shutil.which(self._decoder_cmd[0]): + logger.warning( + f"FMDemodConsumer: decoder '{self._decoder_cmd[0]}' not found — disabled" + ) + return + try: + self._proc = subprocess.Popen( + self._decoder_cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + register_process(self._proc) + self._stdout_thread = threading.Thread( + target=self._read_stdout, daemon=True, name='fm-demod-stdout' + ) + self._stdout_thread.start() + except Exception as e: + logger.error(f"FMDemodConsumer: failed to start decoder: {e}") + self._proc = None + + def _read_stdout(self) -> None: + assert self._proc is not None + assert self._proc.stdout is not None + try: + for line in self._proc.stdout: + decoded = line.decode('utf-8', errors='replace').rstrip() + if decoded and self._on_decoded: + try: + self._on_decoded(decoded) + except Exception as e: + logger.debug(f"FMDemodConsumer callback error: {e}") + except Exception: + pass + + +# --------------------------------------------------------------------------- +# In-process FM demodulation (mirrors waterfall_websocket._demodulate_monitor_audio) +# --------------------------------------------------------------------------- + + +def _demodulate( + raw: bytes, + sample_rate: int, + center_mhz: float, + monitor_freq_mhz: float, + modulation: str, + rotator_phase: float, +) -> tuple[bytes | None, float]: + """Demodulate CU8 IQ to int16 PCM. + + Returns ``(pcm_bytes, next_rotator_phase)``. + """ + if len(raw) < 32 or sample_rate <= 0: + return None, float(rotator_phase) + + samples = cu8_to_complex(raw) + fs = float(sample_rate) + freq_offset_hz = (float(monitor_freq_mhz) - float(center_mhz)) * 1e6 + nyquist = fs * 0.5 + if abs(freq_offset_hz) > nyquist * 0.98: + return None, float(rotator_phase) + + phase_inc = (2.0 * np.pi * freq_offset_hz) / fs + n = np.arange(samples.size, dtype=np.float64) + rotator = np.exp(-1j * (float(rotator_phase) + phase_inc * n)).astype(np.complex64) + next_phase = float((float(rotator_phase) + phase_inc * samples.size) % (2.0 * np.pi)) + shifted = samples * rotator + + mod = modulation.lower().strip() + target_bb = 48_000.0 + pre_decim = max(1, int(fs // target_bb)) + if pre_decim > 1: + usable = (shifted.size // pre_decim) * pre_decim + if usable < pre_decim: + return None, next_phase + shifted = shifted[:usable].reshape(-1, pre_decim).mean(axis=1) + fs1 = fs / pre_decim + + if shifted.size < 16: + return None, next_phase + + if mod == 'fm': + audio = np.angle(shifted[1:] * np.conj(shifted[:-1])).astype(np.float32) + elif mod == 'am': + envelope = np.abs(shifted).astype(np.float32) + audio = envelope - float(np.mean(envelope)) + elif mod == 'usb': + audio = np.real(shifted).astype(np.float32) + elif mod == 'lsb': + audio = -np.real(shifted).astype(np.float32) + else: + audio = np.real(shifted).astype(np.float32) + + if audio.size < 8: + return None, next_phase + + audio = audio - float(np.mean(audio)) + + # Resample to AUDIO_RATE + out_len = int(audio.size * AUDIO_RATE / fs1) + if out_len < 32: + return None, next_phase + x_old = np.linspace(0.0, 1.0, audio.size, endpoint=False, dtype=np.float32) + x_new = np.linspace(0.0, 1.0, out_len, endpoint=False, dtype=np.float32) + audio = np.interp(x_new, x_old, audio).astype(np.float32) + + peak = float(np.max(np.abs(audio))) if audio.size else 0.0 + if peak > 0: + audio = audio * min(20.0, 0.85 / peak) + + pcm = np.clip(audio, -1.0, 1.0) + return (pcm * 32767.0).astype(np.int16).tobytes(), next_phase diff --git a/utils/ground_station/consumers/gr_satellites.py b/utils/ground_station/consumers/gr_satellites.py new file mode 100644 index 0000000..e9222ba --- /dev/null +++ b/utils/ground_station/consumers/gr_satellites.py @@ -0,0 +1,154 @@ +"""GrSatConsumer — pipes CU8 IQ to gr_satellites for packet decoding. + +``gr_satellites`` is a GNU Radio-based multi-satellite decoder +(https://github.com/daniestevez/gr-satellites). It accepts complex +float32 (cf32) IQ samples on stdin when invoked with ``--iq``. + +This consumer converts CU8 → cf32 via numpy and pipes the result to +``gr_satellites``. If the tool is not installed it silently stays +disabled. + +Decoded JSON packets are forwarded to an optional ``on_decoded`` callback. +""" + +from __future__ import annotations + +import shutil +import subprocess +import threading +from typing import Callable + +import numpy as np + +from utils.logging import get_logger +from utils.process import register_process, safe_terminate, unregister_process + +logger = get_logger('intercept.ground_station.gr_satellites') + +GR_SATELLITES_BIN = 'gr_satellites' + + +class GrSatConsumer: + """CU8 IQ → cf32 → gr_satellites stdin → JSON packets.""" + + def __init__( + self, + satellite_name: str, + *, + on_decoded: Callable[[dict], None] | None = None, + ): + """ + Args: + satellite_name: Satellite name as known to gr_satellites + (e.g. ``'NOAA 15'``, ``'ISS'``). + on_decoded: Callback invoked with each parsed JSON packet dict. + """ + self._satellite_name = satellite_name + self._on_decoded = on_decoded + self._proc: subprocess.Popen | None = None + self._stdout_thread: threading.Thread | None = None + self._sample_rate = 0 + self._enabled = False + + # ------------------------------------------------------------------ + # IQConsumer protocol + # ------------------------------------------------------------------ + + def on_start( + self, + center_mhz: float, + sample_rate: int, + *, + start_freq_mhz: float, + end_freq_mhz: float, + ) -> None: + self._sample_rate = sample_rate + if not shutil.which(GR_SATELLITES_BIN): + logger.info( + "gr_satellites not found — GrSatConsumer disabled. " + "Install via: pip install gr-satellites or apt install python3-gr-satellites" + ) + self._enabled = False + return + self._start_proc(sample_rate) + + def on_chunk(self, raw: bytes) -> None: + if not self._enabled or self._proc is None or self._proc.poll() is not None: + return + # Convert CU8 → cf32 + try: + iq = np.frombuffer(raw, dtype=np.uint8).astype(np.float32) + cf32 = ((iq - 127.5) / 127.5).view(np.complex64) + if self._proc.stdin: + self._proc.stdin.write(cf32.tobytes()) + self._proc.stdin.flush() + except (BrokenPipeError, OSError): + pass + except Exception as e: + logger.debug(f"GrSatConsumer on_chunk error: {e}") + + def on_stop(self) -> None: + self._enabled = False + if self._proc: + safe_terminate(self._proc) + unregister_process(self._proc) + self._proc = None + if self._stdout_thread and self._stdout_thread.is_alive(): + self._stdout_thread.join(timeout=2) + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _start_proc(self, sample_rate: int) -> None: + import json as _json + + cmd = [ + GR_SATELLITES_BIN, + self._satellite_name, + '--samplerate', str(sample_rate), + '--iq', + '--json', + '-', + ] + try: + self._proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + register_process(self._proc) + self._enabled = True + self._stdout_thread = threading.Thread( + target=self._read_stdout, + args=(_json,), + daemon=True, + name='gr-sat-stdout', + ) + self._stdout_thread.start() + logger.info(f"GrSatConsumer started for '{self._satellite_name}'") + except Exception as e: + logger.error(f"GrSatConsumer: failed to start gr_satellites: {e}") + self._proc = None + self._enabled = False + + def _read_stdout(self, _json) -> None: + assert self._proc is not None + assert self._proc.stdout is not None + try: + for line in self._proc.stdout: + text = line.decode('utf-8', errors='replace').rstrip() + if not text: + continue + if self._on_decoded: + try: + data = _json.loads(text) + except _json.JSONDecodeError: + data = {'raw': text} + try: + self._on_decoded(data) + except Exception as e: + logger.debug(f"GrSatConsumer callback error: {e}") + except Exception: + pass diff --git a/utils/ground_station/consumers/sigmf_writer.py b/utils/ground_station/consumers/sigmf_writer.py new file mode 100644 index 0000000..4e5385c --- /dev/null +++ b/utils/ground_station/consumers/sigmf_writer.py @@ -0,0 +1,75 @@ +"""SigMFConsumer — wraps SigMFWriter as an IQ bus consumer.""" + +from __future__ import annotations + +from utils.logging import get_logger +from utils.sigmf import SigMFMetadata, SigMFWriter + +logger = get_logger('intercept.ground_station.sigmf_consumer') + + +class SigMFConsumer: + """IQ consumer that records CU8 chunks to a SigMF file pair.""" + + def __init__( + self, + metadata: SigMFMetadata, + on_complete: 'callable | None' = None, + ): + """ + Args: + metadata: Pre-populated SigMF metadata (satellite info, freq, etc.) + on_complete: Optional callback invoked with ``(meta_path, data_path)`` + when the recording is closed. + """ + self._metadata = metadata + self._on_complete = on_complete + self._writer: SigMFWriter | None = None + + # ------------------------------------------------------------------ + # IQConsumer protocol + # ------------------------------------------------------------------ + + def on_start( + self, + center_mhz: float, + sample_rate: int, + *, + start_freq_mhz: float, + end_freq_mhz: float, + ) -> None: + self._metadata.center_frequency_hz = center_mhz * 1e6 + self._metadata.sample_rate = sample_rate + self._writer = SigMFWriter(self._metadata) + try: + self._writer.open() + except Exception as e: + logger.error(f"SigMFConsumer: failed to open writer: {e}") + self._writer = None + + def on_chunk(self, raw: bytes) -> None: + if self._writer is None: + return + ok = self._writer.write_chunk(raw) + if not ok and self._writer.aborted: + logger.warning("SigMFConsumer: recording aborted (disk full)") + self._writer = None + + def on_stop(self) -> None: + if self._writer is None: + return + result = self._writer.close() + self._writer = None + if result and self._on_complete: + try: + self._on_complete(*result) + except Exception as e: + logger.debug(f"SigMFConsumer on_complete error: {e}") + + # ------------------------------------------------------------------ + # Status + # ------------------------------------------------------------------ + + @property + def bytes_written(self) -> int: + return self._writer.bytes_written if self._writer else 0 diff --git a/utils/ground_station/consumers/waterfall.py b/utils/ground_station/consumers/waterfall.py new file mode 100644 index 0000000..125ce69 --- /dev/null +++ b/utils/ground_station/consumers/waterfall.py @@ -0,0 +1,123 @@ +"""WaterfallConsumer — converts CU8 IQ chunks into binary waterfall frames. + +Frames are placed on an ``output_queue`` that the WebSocket endpoint +(``/ws/satellite_waterfall``) drains and sends to the browser. + +Reuses :mod:`utils.waterfall_fft` for FFT processing so the wire format +is identical to the main listening-post waterfall. +""" + +from __future__ import annotations + +import queue +import time + +import numpy as np + +from utils.logging import get_logger +from utils.waterfall_fft import ( + build_binary_frame, + compute_power_spectrum, + cu8_to_complex, + quantize_to_uint8, +) + +logger = get_logger('intercept.ground_station.waterfall_consumer') + +FFT_SIZE = 1024 +AVG_COUNT = 4 +FPS = 20 +DB_MIN: float | None = None # auto-range +DB_MAX: float | None = None + + +class WaterfallConsumer: + """IQ consumer that produces waterfall binary frames.""" + + def __init__( + self, + output_queue: queue.Queue | None = None, + fft_size: int = FFT_SIZE, + avg_count: int = AVG_COUNT, + fps: int = FPS, + db_min: float | None = DB_MIN, + db_max: float | None = DB_MAX, + ): + self.output_queue: queue.Queue = output_queue or queue.Queue(maxsize=120) + self._fft_size = fft_size + self._avg_count = avg_count + self._fps = fps + self._db_min = db_min + self._db_max = db_max + + self._center_mhz = 0.0 + self._start_freq = 0.0 + self._end_freq = 0.0 + self._sample_rate = 0 + self._buffer = b'' + self._required_bytes = 0 + self._frame_interval = 1.0 / max(1, fps) + self._last_frame_time = 0.0 + + # ------------------------------------------------------------------ + # IQConsumer protocol + # ------------------------------------------------------------------ + + def on_start( + self, + center_mhz: float, + sample_rate: int, + *, + start_freq_mhz: float, + end_freq_mhz: float, + ) -> None: + self._center_mhz = center_mhz + self._sample_rate = sample_rate + self._start_freq = start_freq_mhz + self._end_freq = end_freq_mhz + # How many IQ samples (pairs) we need for one FFT frame + required_samples = max( + self._fft_size * self._avg_count, + sample_rate // max(1, self._fps), + ) + self._required_bytes = required_samples * 2 # 1 byte I + 1 byte Q + self._frame_interval = 1.0 / max(1, self._fps) + self._buffer = b'' + self._last_frame_time = 0.0 + + def on_chunk(self, raw: bytes) -> None: + self._buffer += raw + now = time.monotonic() + if (now - self._last_frame_time) < self._frame_interval: + return + if len(self._buffer) < self._required_bytes: + return + + chunk = self._buffer[-self._required_bytes:] + self._buffer = b'' + self._last_frame_time = now + + try: + samples = cu8_to_complex(chunk) + power_db = compute_power_spectrum( + samples, fft_size=self._fft_size, avg_count=self._avg_count + ) + quantized = quantize_to_uint8(power_db, db_min=self._db_min, db_max=self._db_max) + frame = build_binary_frame(self._start_freq, self._end_freq, quantized) + except Exception as e: + logger.debug(f"WaterfallConsumer FFT error: {e}") + return + + # Non-blocking enqueue: drop oldest if full + if self.output_queue.full(): + try: + self.output_queue.get_nowait() + except queue.Empty: + pass + try: + self.output_queue.put_nowait(frame) + except queue.Full: + pass + + def on_stop(self) -> None: + self._buffer = b'' diff --git a/utils/ground_station/iq_bus.py b/utils/ground_station/iq_bus.py new file mode 100644 index 0000000..b5eaafc --- /dev/null +++ b/utils/ground_station/iq_bus.py @@ -0,0 +1,307 @@ +"""IQ broadcast bus — single SDR producer, multiple consumers. + +The :class:`IQBus` claims an SDR device, spawns a capture subprocess +(``rx_sdr`` / ``rtl_sdr``), reads raw CU8 bytes from stdout in a +producer thread, and calls :meth:`IQConsumer.on_chunk` on every +registered consumer for each chunk. + +Consumers are responsible for their own internal buffering. The bus +does *not* block on slow consumers — each consumer's ``on_chunk`` is +called in the producer thread, so consumers must be non-blocking. +""" + +from __future__ import annotations + +import shutil +import subprocess +import threading +import time +from typing import Protocol, runtime_checkable + +from utils.logging import get_logger +from utils.process import register_process, safe_terminate, unregister_process + +logger = get_logger('intercept.ground_station.iq_bus') + +CHUNK_SIZE = 65_536 # bytes per read (~27 ms @ 2.4 Msps CU8) + + +@runtime_checkable +class IQConsumer(Protocol): + """Protocol for objects that receive raw CU8 chunks from the IQ bus.""" + + def on_chunk(self, raw: bytes) -> None: + """Called with each raw CU8 chunk from the SDR. Must be fast.""" + ... + + def on_start( + self, + center_mhz: float, + sample_rate: int, + *, + start_freq_mhz: float, + end_freq_mhz: float, + ) -> None: + """Called once when the bus starts, before the first chunk.""" + ... + + def on_stop(self) -> None: + """Called once when the bus stops (LOS or manual stop).""" + ... + + +class _NoopConsumer: + """Fallback used internally for isinstance checks.""" + + def on_chunk(self, raw: bytes) -> None: + pass + + def on_start(self, center_mhz, sample_rate, *, start_freq_mhz, end_freq_mhz): + pass + + def on_stop(self) -> None: + pass + + +class IQBus: + """Single-SDR IQ capture bus with fan-out to multiple consumers.""" + + def __init__( + self, + *, + center_mhz: float, + sample_rate: int = 2_400_000, + gain: float | None = None, + device_index: int = 0, + sdr_type: str = 'rtlsdr', + ppm: int | None = None, + bias_t: bool = False, + ): + self._center_mhz = center_mhz + self._sample_rate = sample_rate + self._gain = gain + self._device_index = device_index + self._sdr_type = sdr_type + self._ppm = ppm + self._bias_t = bias_t + + self._consumers: list[IQConsumer] = [] + self._consumers_lock = threading.Lock() + self._proc: subprocess.Popen | None = None + self._producer_thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._running = False + self._current_freq_mhz = center_mhz + + # ------------------------------------------------------------------ + # Consumer management + # ------------------------------------------------------------------ + + def add_consumer(self, consumer: IQConsumer) -> None: + with self._consumers_lock: + if consumer not in self._consumers: + self._consumers.append(consumer) + + def remove_consumer(self, consumer: IQConsumer) -> None: + with self._consumers_lock: + self._consumers = [c for c in self._consumers if c is not consumer] + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def start(self) -> tuple[bool, str]: + """Start IQ capture. Returns (success, error_message).""" + if self._running: + return True, '' + + try: + cmd = self._build_command(self._center_mhz) + except Exception as e: + return False, f'Failed to build IQ capture command: {e}' + + if not shutil.which(cmd[0]): + return False, f'Required tool "{cmd[0]}" not found. Install SoapySDR (rx_sdr) or rtl-sdr.' + + try: + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + register_process(self._proc) + except Exception as e: + return False, f'Failed to spawn IQ capture: {e}' + + # Brief check that the process actually started + time.sleep(0.3) + if self._proc.poll() is not None: + stderr_out = '' + if self._proc.stderr: + try: + stderr_out = self._proc.stderr.read().decode('utf-8', errors='replace').strip() + except Exception: + pass + unregister_process(self._proc) + self._proc = None + detail = f': {stderr_out}' if stderr_out else '' + return False, f'IQ capture process exited immediately{detail}' + + self._stop_event.clear() + self._running = True + + span_mhz = self._sample_rate / 1e6 + start_freq_mhz = self._center_mhz - span_mhz / 2 + end_freq_mhz = self._center_mhz + span_mhz / 2 + + with self._consumers_lock: + for consumer in list(self._consumers): + try: + consumer.on_start( + self._center_mhz, + self._sample_rate, + start_freq_mhz=start_freq_mhz, + end_freq_mhz=end_freq_mhz, + ) + except Exception as e: + logger.warning(f"Consumer on_start error: {e}") + + self._producer_thread = threading.Thread( + target=self._producer_loop, daemon=True, name='iq-bus-producer' + ) + self._producer_thread.start() + logger.info( + f"IQBus started: {self._center_mhz} MHz, sr={self._sample_rate}, " + f"device={self._sdr_type}:{self._device_index}" + ) + return True, '' + + def stop(self) -> None: + """Stop IQ capture and notify all consumers.""" + self._stop_event.set() + if self._proc: + safe_terminate(self._proc) + unregister_process(self._proc) + self._proc = None + if self._producer_thread and self._producer_thread.is_alive(): + self._producer_thread.join(timeout=3) + self._running = False + + with self._consumers_lock: + for consumer in list(self._consumers): + try: + consumer.on_stop() + except Exception as e: + logger.warning(f"Consumer on_stop error: {e}") + + logger.info("IQBus stopped") + + def retune(self, new_freq_mhz: float) -> tuple[bool, str]: + """Retune by stopping and restarting the capture process.""" + self._current_freq_mhz = new_freq_mhz + if not self._running: + return False, 'Not running' + + # Stop the current process + self._stop_event.set() + if self._proc: + safe_terminate(self._proc) + unregister_process(self._proc) + self._proc = None + if self._producer_thread and self._producer_thread.is_alive(): + self._producer_thread.join(timeout=2) + + # Restart at new frequency + self._stop_event.clear() + try: + cmd = self._build_command(new_freq_mhz) + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + register_process(self._proc) + except Exception as e: + self._running = False + return False, f'Retune failed: {e}' + + self._producer_thread = threading.Thread( + target=self._producer_loop, daemon=True, name='iq-bus-producer' + ) + self._producer_thread.start() + logger.info(f"IQBus retuned to {new_freq_mhz:.6f} MHz") + return True, '' + + @property + def running(self) -> bool: + return self._running + + @property + def center_mhz(self) -> float: + return self._current_freq_mhz + + @property + def sample_rate(self) -> int: + return self._sample_rate + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _producer_loop(self) -> None: + """Read CU8 chunks from the subprocess and fan out to consumers.""" + assert self._proc is not None + assert self._proc.stdout is not None + + try: + while not self._stop_event.is_set(): + if self._proc.poll() is not None: + logger.warning("IQBus: capture process exited unexpectedly") + break + raw = self._proc.stdout.read(CHUNK_SIZE) + if not raw: + break + with self._consumers_lock: + consumers = list(self._consumers) + for consumer in consumers: + try: + consumer.on_chunk(raw) + except Exception as e: + logger.warning(f"Consumer on_chunk error: {e}") + except Exception as e: + logger.error(f"IQBus producer loop error: {e}") + + def _build_command(self, freq_mhz: float) -> list[str]: + """Build the IQ capture command using the SDR factory.""" + from utils.sdr import SDRFactory, SDRType + from utils.sdr.base import SDRDevice + + type_map = { + 'rtlsdr': SDRType.RTL_SDR, + 'rtl_sdr': SDRType.RTL_SDR, + 'hackrf': SDRType.HACKRF, + 'limesdr': SDRType.LIME_SDR, + 'airspy': SDRType.AIRSPY, + 'sdrplay': SDRType.SDRPLAY, + } + sdr_type = type_map.get(self._sdr_type.lower(), SDRType.RTL_SDR) + builder = SDRFactory.get_builder(sdr_type) + caps = builder.get_capabilities() + device = SDRDevice( + sdr_type=sdr_type, + index=self._device_index, + name=f'{sdr_type.value}-{self._device_index}', + serial='N/A', + driver=sdr_type.value, + capabilities=caps, + ) + return builder.build_iq_capture_command( + device=device, + frequency_mhz=freq_mhz, + sample_rate=self._sample_rate, + gain=self._gain, + ppm=self._ppm, + bias_t=self._bias_t, + ) diff --git a/utils/ground_station/observation_profile.py b/utils/ground_station/observation_profile.py new file mode 100644 index 0000000..3d45be0 --- /dev/null +++ b/utils/ground_station/observation_profile.py @@ -0,0 +1,140 @@ +"""Observation profile dataclass and DB CRUD. + +An ObservationProfile describes *how* to capture a particular satellite: +frequency, decoder type, gain, bandwidth, minimum elevation, and whether +to record raw IQ in SigMF format. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any + +from utils.logging import get_logger + +logger = get_logger('intercept.ground_station.profile') + + +@dataclass +class ObservationProfile: + """Per-satellite capture configuration.""" + + norad_id: int + name: str # Human-readable label + frequency_mhz: float + decoder_type: str # 'fm', 'afsk', 'bpsk', 'gmsk', 'iq_only' + gain: float = 40.0 + bandwidth_hz: int = 200_000 + min_elevation: float = 10.0 + enabled: bool = True + record_iq: bool = False + iq_sample_rate: int = 2_400_000 + id: int | None = None + created_at: str = field( + default_factory=lambda: datetime.now(timezone.utc).isoformat() + ) + + def to_dict(self) -> dict[str, Any]: + return { + 'id': self.id, + 'norad_id': self.norad_id, + 'name': self.name, + 'frequency_mhz': self.frequency_mhz, + 'decoder_type': self.decoder_type, + 'gain': self.gain, + 'bandwidth_hz': self.bandwidth_hz, + 'min_elevation': self.min_elevation, + 'enabled': self.enabled, + 'record_iq': self.record_iq, + 'iq_sample_rate': self.iq_sample_rate, + 'created_at': self.created_at, + } + + @classmethod + def from_row(cls, row) -> 'ObservationProfile': + return cls( + id=row['id'], + norad_id=row['norad_id'], + name=row['name'], + frequency_mhz=row['frequency_mhz'], + decoder_type=row['decoder_type'], + gain=row['gain'], + bandwidth_hz=row['bandwidth_hz'], + min_elevation=row['min_elevation'], + enabled=bool(row['enabled']), + record_iq=bool(row['record_iq']), + iq_sample_rate=row['iq_sample_rate'], + created_at=row['created_at'], + ) + + +# --------------------------------------------------------------------------- +# DB CRUD +# --------------------------------------------------------------------------- + + +def list_profiles() -> list[ObservationProfile]: + """Return all observation profiles from the database.""" + from utils.database import get_db + with get_db() as conn: + rows = conn.execute( + 'SELECT * FROM observation_profiles ORDER BY created_at DESC' + ).fetchall() + return [ObservationProfile.from_row(r) for r in rows] + + +def get_profile(norad_id: int) -> ObservationProfile | None: + """Return the profile for a NORAD ID, or None if not found.""" + from utils.database import get_db + with get_db() as conn: + row = conn.execute( + 'SELECT * FROM observation_profiles WHERE norad_id = ?', (norad_id,) + ).fetchone() + return ObservationProfile.from_row(row) if row else None + + +def save_profile(profile: ObservationProfile) -> ObservationProfile: + """Insert or replace an observation profile. Returns the saved profile.""" + from utils.database import get_db + with get_db() as conn: + conn.execute(''' + INSERT INTO observation_profiles + (norad_id, name, frequency_mhz, decoder_type, gain, + bandwidth_hz, min_elevation, enabled, record_iq, + iq_sample_rate, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(norad_id) DO UPDATE SET + name=excluded.name, + frequency_mhz=excluded.frequency_mhz, + decoder_type=excluded.decoder_type, + gain=excluded.gain, + bandwidth_hz=excluded.bandwidth_hz, + min_elevation=excluded.min_elevation, + enabled=excluded.enabled, + record_iq=excluded.record_iq, + iq_sample_rate=excluded.iq_sample_rate + ''', ( + profile.norad_id, + profile.name, + profile.frequency_mhz, + profile.decoder_type, + profile.gain, + profile.bandwidth_hz, + profile.min_elevation, + int(profile.enabled), + int(profile.record_iq), + profile.iq_sample_rate, + profile.created_at, + )) + return get_profile(profile.norad_id) or profile + + +def delete_profile(norad_id: int) -> bool: + """Delete a profile by NORAD ID. Returns True if a row was deleted.""" + from utils.database import get_db + with get_db() as conn: + cur = conn.execute( + 'DELETE FROM observation_profiles WHERE norad_id = ?', (norad_id,) + ) + return cur.rowcount > 0 diff --git a/utils/ground_station/scheduler.py b/utils/ground_station/scheduler.py new file mode 100644 index 0000000..f874604 --- /dev/null +++ b/utils/ground_station/scheduler.py @@ -0,0 +1,794 @@ +"""Ground station automated observation scheduler. + +Watches enabled :class:`~utils.ground_station.observation_profile.ObservationProfile` +entries, predicts passes for each satellite, fires a capture at AOS, and +stops it at LOS. + +During a capture: +* An :class:`~utils.ground_station.iq_bus.IQBus` claims the SDR device. +* Consumers are attached according to ``profile.decoder_type``: + - ``'iq_only'`` → SigMFConsumer only (if ``record_iq`` is True). + - ``'fm'`` → FMDemodConsumer (direwolf AX.25) + optional SigMF. + - ``'afsk'`` → FMDemodConsumer (direwolf AX.25) + optional SigMF. + - ``'gmsk'`` → FMDemodConsumer (multimon-ng) + optional SigMF. + - ``'bpsk'`` → GrSatConsumer + optional SigMF. +* A WaterfallConsumer is always attached for the live spectrum panel. +* A Doppler correction thread retunes the IQ bus every 5 s if shift > threshold. +* A rotator control thread points the antenna (if rotctld is available). +""" + +from __future__ import annotations + +import json +import queue +import threading +import time +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Callable + +from utils.logging import get_logger + +logger = get_logger('intercept.ground_station.scheduler') + +# Env-configurable Doppler retune threshold (Hz) +try: + from config import GS_DOPPLER_THRESHOLD_HZ # type: ignore[import] +except (ImportError, AttributeError): + import os + GS_DOPPLER_THRESHOLD_HZ = int(os.environ.get('INTERCEPT_GS_DOPPLER_THRESHOLD_HZ', 500)) + +DOPPLER_INTERVAL_SECONDS = 5 +SCHEDULE_REFRESH_MINUTES = 30 +CAPTURE_BUFFER_SECONDS = 30 + + +# --------------------------------------------------------------------------- +# Scheduled observation (state machine) +# --------------------------------------------------------------------------- + +class ScheduledObservation: + """A single scheduled pass for a profile.""" + + def __init__( + self, + profile_norad_id: int, + satellite_name: str, + aos_iso: str, + los_iso: str, + max_el: float, + ): + self.id = str(uuid.uuid4())[:8] + self.profile_norad_id = profile_norad_id + self.satellite_name = satellite_name + self.aos_iso = aos_iso + self.los_iso = los_iso + self.max_el = max_el + self.status: str = 'scheduled' + self._start_timer: threading.Timer | None = None + self._stop_timer: threading.Timer | None = None + + @property + def aos_dt(self) -> datetime: + return _parse_utc_iso(self.aos_iso) + + @property + def los_dt(self) -> datetime: + return _parse_utc_iso(self.los_iso) + + def to_dict(self) -> dict[str, Any]: + return { + 'id': self.id, + 'norad_id': self.profile_norad_id, + 'satellite': self.satellite_name, + 'aos': self.aos_iso, + 'los': self.los_iso, + 'max_el': self.max_el, + 'status': self.status, + } + + +# --------------------------------------------------------------------------- +# Scheduler +# --------------------------------------------------------------------------- + +class GroundStationScheduler: + """Automated ground station observation scheduler.""" + + def __init__(self): + self._enabled = False + self._lock = threading.Lock() + self._observations: list[ScheduledObservation] = [] + self._refresh_timer: threading.Timer | None = None + self._event_callback: Callable[[dict[str, Any]], None] | None = None + + # Active capture state + self._active_obs: ScheduledObservation | None = None + self._active_iq_bus = None # IQBus instance + self._active_waterfall_consumer = None + self._doppler_thread: threading.Thread | None = None + self._doppler_stop = threading.Event() + self._active_profile = None # ObservationProfile + self._active_doppler_tracker = None # DopplerTracker + + # Shared waterfall queue (consumed by /ws/satellite_waterfall) + self.waterfall_queue: queue.Queue = queue.Queue(maxsize=120) + + # Observer location + self._lat: float = 0.0 + self._lon: float = 0.0 + self._device: int = 0 + self._sdr_type: str = 'rtlsdr' + + # ------------------------------------------------------------------ + # Public control API + # ------------------------------------------------------------------ + + def set_event_callback( + self, callback: Callable[[dict[str, Any]], None] + ) -> None: + self._event_callback = callback + + def enable( + self, + lat: float, + lon: float, + device: int = 0, + sdr_type: str = 'rtlsdr', + ) -> dict[str, Any]: + with self._lock: + self._lat = lat + self._lon = lon + self._device = device + self._sdr_type = sdr_type + self._enabled = True + self._refresh_schedule() + return self.get_status() + + def disable(self) -> dict[str, Any]: + with self._lock: + self._enabled = False + if self._refresh_timer: + self._refresh_timer.cancel() + self._refresh_timer = None + for obs in self._observations: + if obs._start_timer: + obs._start_timer.cancel() + if obs._stop_timer: + obs._stop_timer.cancel() + self._observations.clear() + self._stop_active_capture(reason='scheduler_disabled') + return {'status': 'disabled'} + + @property + def enabled(self) -> bool: + return self._enabled + + def get_status(self) -> dict[str, Any]: + with self._lock: + active = self._active_obs.to_dict() if self._active_obs else None + return { + 'enabled': self._enabled, + 'observer': {'latitude': self._lat, 'longitude': self._lon}, + 'device': self._device, + 'sdr_type': self._sdr_type, + 'scheduled_count': sum( + 1 for o in self._observations if o.status == 'scheduled' + ), + 'total_observations': len(self._observations), + 'active_observation': active, + 'waterfall_active': self._active_iq_bus is not None + and self._active_iq_bus.running, + } + + def get_scheduled_observations(self) -> list[dict[str, Any]]: + with self._lock: + return [o.to_dict() for o in self._observations] + + def trigger_manual(self, norad_id: int) -> tuple[bool, str]: + """Immediately start a manual observation for the given NORAD ID.""" + from utils.ground_station.observation_profile import get_profile + profile = get_profile(norad_id) + if not profile: + return False, f'No observation profile for NORAD {norad_id}' + obs = ScheduledObservation( + profile_norad_id=norad_id, + satellite_name=profile.name, + aos_iso=datetime.now(timezone.utc).isoformat(), + los_iso=(datetime.now(timezone.utc) + timedelta(minutes=15)).isoformat(), + max_el=90.0, + ) + self._execute_observation(obs) + return True, 'Manual observation started' + + def stop_active(self) -> dict[str, Any]: + """Stop the currently running observation.""" + self._stop_active_capture(reason='manual_stop') + return self.get_status() + + # ------------------------------------------------------------------ + # Schedule management + # ------------------------------------------------------------------ + + def _refresh_schedule(self) -> None: + if not self._enabled: + return + + from utils.ground_station.observation_profile import list_profiles + + profiles = [p for p in list_profiles() if p.enabled] + if not profiles: + logger.info("Ground station scheduler: no enabled profiles") + self._arm_refresh_timer() + return + + try: + passes_by_profile = self._predict_passes_for_profiles(profiles) + except Exception as e: + logger.error(f"Ground station scheduler: pass prediction failed: {e}") + self._arm_refresh_timer() + return + + with self._lock: + # Cancel existing scheduled timers (keep active/complete) + for obs in self._observations: + if obs.status == 'scheduled': + if obs._start_timer: + obs._start_timer.cancel() + if obs._stop_timer: + obs._stop_timer.cancel() + + history = [o for o in self._observations if o.status in ('complete', 'capturing', 'failed')] + self._observations = history + + now = datetime.now(timezone.utc) + buf = CAPTURE_BUFFER_SECONDS + + for obs in passes_by_profile: + capture_start = obs.aos_dt - timedelta(seconds=buf) + capture_end = obs.los_dt + timedelta(seconds=buf) + + if capture_end <= now: + continue + if any(h.id == obs.id for h in history): + continue + + delay = max(0.0, (capture_start - now).total_seconds()) + obs._start_timer = threading.Timer( + delay, self._execute_observation, args=[obs] + ) + obs._start_timer.daemon = True + obs._start_timer.start() + self._observations.append(obs) + + scheduled = sum(1 for o in self._observations if o.status == 'scheduled') + logger.info(f"Ground station scheduler refreshed: {scheduled} observations scheduled") + + self._arm_refresh_timer() + + def _arm_refresh_timer(self) -> None: + if self._refresh_timer: + self._refresh_timer.cancel() + if not self._enabled: + return + self._refresh_timer = threading.Timer( + SCHEDULE_REFRESH_MINUTES * 60, self._refresh_schedule + ) + self._refresh_timer.daemon = True + self._refresh_timer.start() + + def _predict_passes_for_profiles( + self, profiles: list + ) -> list[ScheduledObservation]: + """Predict passes for each profile and return ScheduledObservation list.""" + from skyfield.api import load, wgs84 + from utils.satellite_predict import predict_passes as _predict_passes + + try: + ts = load.timescale() + except Exception: + from skyfield.api import load as _load + ts = _load.timescale() + + observer = wgs84.latlon(self._lat, self._lon) + now = datetime.now(timezone.utc) + import datetime as _dt + t0 = ts.utc(now) + t1 = ts.utc(now + _dt.timedelta(hours=24)) + + observations: list[ScheduledObservation] = [] + + for profile in profiles: + tle = _find_tle_by_norad(profile.norad_id) + if tle is None: + logger.warning( + f"No TLE for NORAD {profile.norad_id} ({profile.name}) — skipping" + ) + continue + try: + passes = _predict_passes( + tle_data=tle, + observer=observer, + ts=ts, + t0=t0, + t1=t1, + min_el=profile.min_elevation, + include_trajectory=False, + include_ground_track=False, + ) + except Exception as e: + logger.warning(f"Pass prediction failed for {profile.name}: {e}") + continue + + for p in passes: + obs = ScheduledObservation( + profile_norad_id=profile.norad_id, + satellite_name=profile.name, + aos_iso=p.get('startTimeISO', ''), + los_iso=p.get('endTimeISO', ''), + max_el=float(p.get('maxEl', 0.0)), + ) + observations.append(obs) + + return observations + + # ------------------------------------------------------------------ + # Capture execution + # ------------------------------------------------------------------ + + def _execute_observation(self, obs: ScheduledObservation) -> None: + """Called at AOS (+ buffer) to start IQ capture.""" + if not self._enabled: + return + if obs.status == 'scheduled': + obs.status = 'capturing' + else: + return # already cancelled / complete + + from utils.ground_station.observation_profile import get_profile + profile = get_profile(obs.profile_norad_id) + if not profile or not profile.enabled: + obs.status = 'failed' + return + + # Claim SDR device + try: + import app as _app + err = _app.claim_sdr_device(self._device, 'ground_station_iq_bus', self._sdr_type) + if err: + logger.warning(f"Ground station: SDR busy — skipping {obs.satellite_name}: {err}") + obs.status = 'failed' + self._emit_event({'type': 'observation_skipped', 'observation': obs.to_dict(), 'reason': 'device_busy'}) + return + except ImportError: + pass + + # Create DB record + obs_db_id = _insert_observation_record(obs, profile) + + # Build IQ bus + from utils.ground_station.iq_bus import IQBus + bus = IQBus( + center_mhz=profile.frequency_mhz, + sample_rate=profile.iq_sample_rate, + gain=profile.gain, + device_index=self._device, + sdr_type=self._sdr_type, + ) + + # Attach waterfall consumer (always) + from utils.ground_station.consumers.waterfall import WaterfallConsumer + wf_consumer = WaterfallConsumer(output_queue=self.waterfall_queue) + bus.add_consumer(wf_consumer) + + # Attach decoder consumers + self._attach_decoder_consumers(bus, profile, obs_db_id, obs) + + # Attach SigMF consumer if requested + if profile.record_iq: + self._attach_sigmf_consumer(bus, profile, obs_db_id) + + # Start bus + ok, err_msg = bus.start() + if not ok: + logger.error(f"Ground station: failed to start IQBus for {obs.satellite_name}: {err_msg}") + obs.status = 'failed' + try: + import app as _app + _app.release_sdr_device(self._device, self._sdr_type) + except ImportError: + pass + self._emit_event({'type': 'observation_failed', 'observation': obs.to_dict(), 'reason': err_msg}) + return + + with self._lock: + self._active_obs = obs + self._active_iq_bus = bus + self._active_waterfall_consumer = wf_consumer + self._active_profile = profile + + # Emit iq_bus_started SSE event (used by Phase 5 waterfall) + span_mhz = profile.iq_sample_rate / 1e6 + self._emit_event({ + 'type': 'iq_bus_started', + 'observation': obs.to_dict(), + 'center_mhz': profile.frequency_mhz, + 'span_mhz': span_mhz, + }) + self._emit_event({'type': 'observation_started', 'observation': obs.to_dict()}) + logger.info(f"Ground station: observation started for {obs.satellite_name} (NORAD {obs.profile_norad_id})") + + # Start Doppler correction thread + self._start_doppler_thread(profile, obs) + + # Schedule stop at LOS + buffer + now = datetime.now(timezone.utc) + stop_delay = (obs.los_dt + timedelta(seconds=CAPTURE_BUFFER_SECONDS) - now).total_seconds() + if stop_delay > 0: + obs._stop_timer = threading.Timer( + stop_delay, self._stop_active_capture, kwargs={'reason': 'los'} + ) + obs._stop_timer.daemon = True + obs._stop_timer.start() + else: + self._stop_active_capture(reason='los_immediate') + + def _stop_active_capture(self, *, reason: str = 'manual') -> None: + """Stop the currently active capture and release the SDR device.""" + with self._lock: + bus = self._active_iq_bus + obs = self._active_obs + self._active_iq_bus = None + self._active_obs = None + self._active_waterfall_consumer = None + self._active_profile = None + self._active_doppler_tracker = None + + self._doppler_stop.set() + + if bus and bus.running: + bus.stop() + + if obs: + obs.status = 'complete' + _update_observation_status(obs, 'complete') + self._emit_event({ + 'type': 'observation_complete', + 'observation': obs.to_dict(), + 'reason': reason, + }) + self._emit_event({'type': 'iq_bus_stopped', 'observation': obs.to_dict()}) + + try: + import app as _app + _app.release_sdr_device(self._device, self._sdr_type) + except ImportError: + pass + + logger.info(f"Ground station: observation stopped ({reason})") + + # ------------------------------------------------------------------ + # Consumer attachment helpers + # ------------------------------------------------------------------ + + def _attach_decoder_consumers(self, bus, profile, obs_db_id: int | None, obs) -> None: + """Attach the appropriate decoder consumer based on profile.decoder_type.""" + decoder_type = (profile.decoder_type or '').lower() + + if decoder_type in ('fm', 'afsk'): + # direwolf for AX.25 / AFSK + import shutil + if shutil.which('direwolf'): + from utils.ground_station.consumers.fm_demod import FMDemodConsumer + consumer = FMDemodConsumer( + decoder_cmd=[ + 'direwolf', '-r', '48000', '-n', '1', '-b', '16', '-', + ], + modulation='fm', + on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs), + ) + bus.add_consumer(consumer) + logger.info("Ground station: attached direwolf AX.25 decoder") + else: + logger.warning("direwolf not found — AX.25 decoding disabled") + + elif decoder_type == 'gmsk': + import shutil + if shutil.which('multimon-ng'): + from utils.ground_station.consumers.fm_demod import FMDemodConsumer + consumer = FMDemodConsumer( + decoder_cmd=['multimon-ng', '-t', 'raw', '-a', 'GMSK', '-'], + modulation='fm', + on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs), + ) + bus.add_consumer(consumer) + logger.info("Ground station: attached multimon-ng GMSK decoder") + else: + logger.warning("multimon-ng not found — GMSK decoding disabled") + + elif decoder_type == 'bpsk': + from utils.ground_station.consumers.gr_satellites import GrSatConsumer + consumer = GrSatConsumer( + satellite_name=profile.name, + on_decoded=lambda pkt: self._on_packet_decoded( + json.dumps(pkt) if isinstance(pkt, dict) else str(pkt), + obs_db_id, + obs, + ), + ) + bus.add_consumer(consumer) + + # 'iq_only' → no decoder, just SigMF + + def _attach_sigmf_consumer(self, bus, profile, obs_db_id: int | None) -> None: + """Attach a SigMFConsumer for raw IQ recording.""" + from utils.sigmf import SigMFMetadata + from utils.ground_station.consumers.sigmf_writer import SigMFConsumer + + meta = SigMFMetadata( + sample_rate=profile.iq_sample_rate, + center_frequency_hz=profile.frequency_mhz * 1e6, + satellite_name=profile.name, + norad_id=profile.norad_id, + latitude=self._lat, + longitude=self._lon, + ) + + def _on_recording_complete(meta_path, data_path): + _insert_recording_record(obs_db_id, meta_path, data_path, profile) + self._emit_event({ + 'type': 'recording_complete', + 'norad_id': profile.norad_id, + 'data_path': str(data_path), + 'meta_path': str(meta_path), + }) + + consumer = SigMFConsumer(metadata=meta, on_complete=_on_recording_complete) + bus.add_consumer(consumer) + logger.info(f"Ground station: SigMF recording enabled for {profile.name}") + + # ------------------------------------------------------------------ + # Doppler correction (Phase 2) + # ------------------------------------------------------------------ + + def _start_doppler_thread(self, profile, obs: ScheduledObservation) -> None: + """Start the Doppler tracking/retune thread for an active capture.""" + from utils.doppler import DopplerTracker + + tle = _find_tle_by_norad(profile.norad_id) + if tle is None: + logger.info(f"Ground station: no TLE for {profile.name} — Doppler disabled") + return + + tracker = DopplerTracker(satellite_name=profile.name, tle_data=tle) + if not tracker.configure(self._lat, self._lon): + logger.info(f"Ground station: Doppler tracking not available for {profile.name}") + return + + with self._lock: + self._active_doppler_tracker = tracker + + self._doppler_stop.clear() + t = threading.Thread( + target=self._doppler_loop, + args=[profile, tracker], + daemon=True, + name='gs-doppler', + ) + t.start() + self._doppler_thread = t + logger.info(f"Ground station: Doppler tracking started for {profile.name}") + + def _doppler_loop(self, profile, tracker) -> None: + """Periodically compute Doppler shift and retune if necessary.""" + while not self._doppler_stop.wait(DOPPLER_INTERVAL_SECONDS): + with self._lock: + bus = self._active_iq_bus + + if bus is None or not bus.running: + break + + info = tracker.calculate(profile.frequency_mhz) + if info is None: + continue + + # Retune if shift exceeds threshold + if abs(info.shift_hz) >= GS_DOPPLER_THRESHOLD_HZ: + corrected_mhz = info.frequency_hz / 1_000_000 + logger.info( + f"Ground station: Doppler retune {info.shift_hz:+.1f} Hz → " + f"{corrected_mhz:.6f} MHz (el={info.elevation:.1f}°)" + ) + bus.retune(corrected_mhz) + self._emit_event({ + 'type': 'doppler_update', + 'norad_id': profile.norad_id, + **info.to_dict(), + }) + + # Rotator control (Phase 6) + try: + from utils.rotator import get_rotator + rotator = get_rotator() + if rotator.enabled: + rotator.point_to(info.azimuth, info.elevation) + except Exception: + pass + + logger.debug("Ground station: Doppler loop exited") + + # ------------------------------------------------------------------ + # Packet / event callbacks + # ------------------------------------------------------------------ + + def _on_packet_decoded(self, line: str, obs_db_id: int | None, obs: ScheduledObservation) -> None: + """Handle a decoded packet line from a decoder consumer.""" + if not line: + return + _insert_event_record(obs_db_id, 'packet', line) + self._emit_event({ + 'type': 'packet_decoded', + 'norad_id': obs.profile_norad_id, + 'satellite': obs.satellite_name, + 'data': line, + }) + + def _emit_event(self, event: dict[str, Any]) -> None: + if self._event_callback: + try: + self._event_callback(event) + except Exception as e: + logger.debug(f"Event callback error: {e}") + + +# --------------------------------------------------------------------------- +# DB helpers +# --------------------------------------------------------------------------- + + +def _insert_observation_record(obs: ScheduledObservation, profile) -> int | None: + try: + from utils.database import get_db + from datetime import datetime, timezone + with get_db() as conn: + cur = conn.execute(''' + INSERT INTO ground_station_observations + (profile_id, norad_id, satellite, aos_time, los_time, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', ( + profile.id, + obs.profile_norad_id, + obs.satellite_name, + obs.aos_iso, + obs.los_iso, + 'capturing', + datetime.now(timezone.utc).isoformat(), + )) + return cur.lastrowid + except Exception as e: + logger.warning(f"Failed to insert observation record: {e}") + return None + + +def _update_observation_status(obs: ScheduledObservation, status: str) -> None: + try: + from utils.database import get_db + with get_db() as conn: + conn.execute( + 'UPDATE ground_station_observations SET status=? WHERE norad_id=? AND status=?', + (status, obs.profile_norad_id, 'capturing'), + ) + except Exception as e: + logger.debug(f"Failed to update observation status: {e}") + + +def _insert_event_record(obs_db_id: int | None, event_type: str, payload: str) -> None: + if obs_db_id is None: + return + try: + from utils.database import get_db + from datetime import datetime, timezone + with get_db() as conn: + conn.execute(''' + INSERT INTO ground_station_events (observation_id, event_type, payload_json, timestamp) + VALUES (?, ?, ?, ?) + ''', (obs_db_id, event_type, payload, datetime.now(timezone.utc).isoformat())) + except Exception as e: + logger.debug(f"Failed to insert event record: {e}") + + +def _insert_recording_record(obs_db_id: int | None, meta_path: Path, data_path: Path, profile) -> None: + try: + from utils.database import get_db + from datetime import datetime, timezone + size = data_path.stat().st_size if data_path.exists() else 0 + with get_db() as conn: + conn.execute(''' + INSERT INTO sigmf_recordings + (observation_id, sigmf_data_path, sigmf_meta_path, size_bytes, + sample_rate, center_freq_hz, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', ( + obs_db_id, + str(data_path), + str(meta_path), + size, + profile.iq_sample_rate, + int(profile.frequency_mhz * 1e6), + datetime.now(timezone.utc).isoformat(), + )) + except Exception as e: + logger.warning(f"Failed to insert recording record: {e}") + + +# --------------------------------------------------------------------------- +# TLE lookup helpers +# --------------------------------------------------------------------------- + + +def _find_tle_by_norad(norad_id: int) -> tuple[str, str, str] | None: + """Search TLE cache for a given NORAD catalog number.""" + # Try live cache first + sources = [] + try: + from routes.satellite import _tle_cache # type: ignore[import] + if _tle_cache: + sources.append(_tle_cache) + except (ImportError, AttributeError): + pass + try: + from data.satellites import TLE_SATELLITES + sources.append(TLE_SATELLITES) + except ImportError: + pass + + target_id = str(norad_id).zfill(5) + + for source in sources: + for _key, tle in source.items(): + if not isinstance(tle, (tuple, list)) or len(tle) < 3: + continue + line1 = str(tle[1]) + # NORAD catalog number occupies chars 2-6 (0-indexed) of TLE line 1 + if len(line1) > 7: + catalog_str = line1[2:7].strip() + if catalog_str == target_id: + return (str(tle[0]), str(tle[1]), str(tle[2])) + + return None + + +# --------------------------------------------------------------------------- +# Timestamp parser (mirrors weather_sat_scheduler) +# --------------------------------------------------------------------------- + + +def _parse_utc_iso(value: str) -> datetime: + text = str(value).strip().replace('+00:00Z', 'Z') + if text.endswith('Z'): + text = text[:-1] + '+00:00' + dt = datetime.fromisoformat(text) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) + return dt + + +# --------------------------------------------------------------------------- +# Singleton +# --------------------------------------------------------------------------- + +_scheduler: GroundStationScheduler | None = None +_scheduler_lock = threading.Lock() + + +def get_ground_station_scheduler() -> GroundStationScheduler: + """Get or create the global ground station scheduler.""" + global _scheduler + if _scheduler is None: + with _scheduler_lock: + if _scheduler is None: + _scheduler = GroundStationScheduler() + return _scheduler diff --git a/utils/rotator.py b/utils/rotator.py new file mode 100644 index 0000000..4ed1a7d --- /dev/null +++ b/utils/rotator.py @@ -0,0 +1,194 @@ +"""Hamlib rotctld TCP client for antenna rotator control. + +Communicates with a running ``rotctld`` daemon over TCP using the simple +line-based Hamlib protocol:: + + Client → ``P \\n`` + Server → ``RPRT 0\\n`` (success) + +If ``rotctld`` is not reachable the controller silently operates in a +disabled state — the rest of the system functions normally. + +Usage:: + + rotator = get_rotator() + if rotator.connect('127.0.0.1', 4533): + rotator.point_to(az=180.0, el=30.0) + rotator.park() + rotator.disconnect() +""" + +from __future__ import annotations + +import socket +import threading + +from utils.logging import get_logger + +logger = get_logger('intercept.rotator') + +DEFAULT_HOST = '127.0.0.1' +DEFAULT_PORT = 4533 +DEFAULT_TIMEOUT = 2.0 # seconds + + +class RotatorController: + """Thin wrapper around the rotctld TCP protocol.""" + + def __init__(self): + self._sock: socket.socket | None = None + self._lock = threading.Lock() + self._host = DEFAULT_HOST + self._port = DEFAULT_PORT + self._enabled = False + self._current_az: float = 0.0 + self._current_el: float = 0.0 + + # ------------------------------------------------------------------ + # Connection management + # ------------------------------------------------------------------ + + def connect(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> bool: + """Connect to rotctld. Returns True on success.""" + with self._lock: + self._host = host + self._port = port + try: + s = socket.create_connection((host, port), timeout=DEFAULT_TIMEOUT) + s.settimeout(DEFAULT_TIMEOUT) + self._sock = s + self._enabled = True + logger.info(f"Rotator connected to rotctld at {host}:{port}") + return True + except OSError as e: + logger.warning(f"Could not connect to rotctld at {host}:{port}: {e}") + self._sock = None + self._enabled = False + return False + + def disconnect(self) -> None: + """Close the TCP connection.""" + with self._lock: + if self._sock: + try: + self._sock.close() + except OSError: + pass + self._sock = None + self._enabled = False + logger.info("Rotator disconnected") + + # ------------------------------------------------------------------ + # Commands + # ------------------------------------------------------------------ + + def point_to(self, az: float, el: float) -> bool: + """Send a ``P`` (set position) command. + + Azimuth and elevation are clamped to valid ranges before sending. + + Returns True if the command was acknowledged. + """ + az = max(0.0, min(360.0, float(az))) + el = max(0.0, min(90.0, float(el))) + + ok = self._send_command(f'P {az:.1f} {el:.1f}') + if ok: + self._current_az = az + self._current_el = el + return ok + + def park(self) -> bool: + """Send rotator to park position (0° az, 0° el).""" + return self.point_to(0.0, 0.0) + + def get_position(self) -> tuple[float, float] | None: + """Query current position. Returns (az, el) or None on failure.""" + with self._lock: + if not self._enabled or self._sock is None: + return None + try: + self._sock.sendall(b'p\n') + resp = self._recv_line() + if resp and 'RPRT' not in resp: + parts = resp.split() + if len(parts) >= 2: + return float(parts[0]), float(parts[1]) + except Exception as e: + logger.warning(f"Rotator get_position failed: {e}") + self._enabled = False + self._sock = None + return None + + # ------------------------------------------------------------------ + # Status + # ------------------------------------------------------------------ + + @property + def enabled(self) -> bool: + return self._enabled + + def get_status(self) -> dict: + return { + 'enabled': self._enabled, + 'host': self._host, + 'port': self._port, + 'current_az': self._current_az, + 'current_el': self._current_el, + } + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _send_command(self, cmd: str) -> bool: + with self._lock: + if not self._enabled or self._sock is None: + return False + try: + self._sock.sendall((cmd + '\n').encode()) + resp = self._recv_line() + if resp and 'RPRT 0' in resp: + return True + logger.warning(f"Rotator unexpected response to '{cmd}': {resp!r}") + return False + except Exception as e: + logger.warning(f"Rotator command '{cmd}' failed: {e}") + self._enabled = False + try: + self._sock.close() + except OSError: + pass + self._sock = None + return False + + def _recv_line(self, max_bytes: int = 256) -> str: + """Read until newline (already holding _lock).""" + buf = b'' + assert self._sock is not None + while len(buf) < max_bytes: + c = self._sock.recv(1) + if not c: + break + buf += c + if c == b'\n': + break + return buf.decode('ascii', errors='replace').strip() + + +# --------------------------------------------------------------------------- +# Singleton +# --------------------------------------------------------------------------- + +_rotator: RotatorController | None = None +_rotator_lock = threading.Lock() + + +def get_rotator() -> RotatorController: + """Get or create the global rotator controller instance.""" + global _rotator + if _rotator is None: + with _rotator_lock: + if _rotator is None: + _rotator = RotatorController() + return _rotator diff --git a/utils/sigmf.py b/utils/sigmf.py new file mode 100644 index 0000000..a8ad1dd --- /dev/null +++ b/utils/sigmf.py @@ -0,0 +1,208 @@ +"""SigMF metadata and writer for IQ recordings. + +Writes raw CU8 I/Q data to ``.sigmf-data`` files and companion +``.sigmf-meta`` JSON metadata files conforming to the SigMF spec v1.x. + +Output directory: ``instance/ground_station/recordings/`` +""" + +from __future__ import annotations + +import json +import shutil +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from utils.logging import get_logger + +logger = get_logger('intercept.sigmf') + +# Abort recording if less than this many bytes are free on the disk +DEFAULT_MIN_FREE_BYTES = 500 * 1024 * 1024 # 500 MB + +OUTPUT_DIR = Path('instance/ground_station/recordings') + + +@dataclass +class SigMFMetadata: + """SigMF metadata block. + + Covers the fields most relevant for ground-station recordings. The + ``global`` block is always written; an ``annotations`` list is built + incrementally if callers add annotation events. + """ + + sample_rate: int + center_frequency_hz: float + datatype: str = 'cu8' # unsigned 8-bit I/Q (rtlsdr native) + description: str = '' + author: str = 'INTERCEPT ground station' + recorder: str = 'INTERCEPT' + hw: str = '' + norad_id: int = 0 + satellite_name: str = '' + latitude: float = 0.0 + longitude: float = 0.0 + annotations: list[dict[str, Any]] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + global_block: dict[str, Any] = { + 'core:datatype': self.datatype, + 'core:sample_rate': self.sample_rate, + 'core:version': '1.0.0', + 'core:recorder': self.recorder, + } + if self.description: + global_block['core:description'] = self.description + if self.author: + global_block['core:author'] = self.author + if self.hw: + global_block['core:hw'] = self.hw + if self.latitude or self.longitude: + global_block['core:geolocation'] = { + 'type': 'Point', + 'coordinates': [self.longitude, self.latitude], + } + + captures = [ + { + 'core:sample_start': 0, + 'core:frequency': self.center_frequency_hz, + 'core:datetime': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), + } + ] + + return { + 'global': global_block, + 'captures': captures, + 'annotations': self.annotations, + } + + +class SigMFWriter: + """Streams raw CU8 IQ bytes to a SigMF recording pair.""" + + def __init__( + self, + metadata: SigMFMetadata, + output_dir: Path | str | None = None, + stem: str | None = None, + min_free_bytes: int = DEFAULT_MIN_FREE_BYTES, + ): + self._metadata = metadata + self._output_dir = Path(output_dir) if output_dir else OUTPUT_DIR + self._stem = stem or _default_stem(metadata) + self._min_free_bytes = min_free_bytes + + self._data_path: Path | None = None + self._meta_path: Path | None = None + self._data_file = None + self._bytes_written = 0 + self._aborted = False + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def open(self) -> None: + """Create output directory and open the data file for writing.""" + self._output_dir.mkdir(parents=True, exist_ok=True) + self._data_path = self._output_dir / f'{self._stem}.sigmf-data' + self._meta_path = self._output_dir / f'{self._stem}.sigmf-meta' + self._data_file = open(self._data_path, 'wb') + self._bytes_written = 0 + self._aborted = False + logger.info(f"SigMFWriter opened: {self._data_path}") + + def write_chunk(self, raw: bytes) -> bool: + """Write a chunk of raw CU8 bytes. + + Returns False (and sets ``aborted``) if disk space drops below + the minimum threshold. + """ + if self._aborted or self._data_file is None: + return False + + # Check free space before writing + try: + usage = shutil.disk_usage(self._output_dir) + if usage.free < self._min_free_bytes: + logger.warning( + f"SigMF recording aborted — disk free " + f"({usage.free // (1024**2)} MB) below " + f"{self._min_free_bytes // (1024**2)} MB threshold" + ) + self._aborted = True + self._data_file.close() + self._data_file = None + return False + except Exception: + pass + + self._data_file.write(raw) + self._bytes_written += len(raw) + return True + + def close(self) -> tuple[Path, Path] | None: + """Flush data, write .sigmf-meta, close file. + + Returns ``(meta_path, data_path)`` on success, *None* if never + opened or already aborted before any data was written. + """ + if self._data_file is not None: + try: + self._data_file.flush() + self._data_file.close() + except Exception: + pass + self._data_file = None + + if self._data_path is None or self._meta_path is None: + return None + if self._bytes_written == 0 and not self._aborted: + # Nothing written — clean up empty file + self._data_path.unlink(missing_ok=True) + return None + + try: + meta_dict = self._metadata.to_dict() + self._meta_path.write_text( + json.dumps(meta_dict, indent=2), encoding='utf-8' + ) + except Exception as e: + logger.error(f"Failed to write SigMF metadata: {e}") + + logger.info( + f"SigMFWriter closed: {self._bytes_written} bytes → {self._data_path}" + ) + return self._meta_path, self._data_path + + @property + def bytes_written(self) -> int: + return self._bytes_written + + @property + def aborted(self) -> bool: + return self._aborted + + @property + def data_path(self) -> Path | None: + return self._data_path + + @property + def meta_path(self) -> Path | None: + return self._meta_path + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _default_stem(meta: SigMFMetadata) -> str: + ts = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') + sat = (meta.satellite_name or 'unknown').replace(' ', '_').replace('/', '-') + freq_khz = int(meta.center_frequency_hz / 1000) + return f'{ts}_{sat}_{freq_khz}kHz' diff --git a/utils/sstv/sstv_decoder.py b/utils/sstv/sstv_decoder.py index 078781c..b650543 100644 --- a/utils/sstv/sstv_decoder.py +++ b/utils/sstv/sstv_decoder.py @@ -3,8 +3,8 @@ Provides the SSTVDecoder class that manages the full pipeline: rtl_fm subprocess -> audio stream -> VIS detection -> image decoding -> PNG output. -Also contains DopplerTracker and supporting dataclasses migrated from the -original monolithic utils/sstv.py. +DopplerTracker and DopplerInfo live in utils/doppler.py and are re-exported +here for backwards compatibility. """ from __future__ import annotations @@ -16,7 +16,7 @@ import subprocess import threading import time from dataclasses import dataclass -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from pathlib import Path from typing import Callable @@ -24,7 +24,13 @@ import numpy as np from utils.logging import get_logger -from .constants import ISS_SSTV_FREQ, SAMPLE_RATE, SPEED_OF_LIGHT +# DopplerTracker/DopplerInfo now live in the shared utils/doppler module. +# Import them here so existing code that does +# ``from utils.sstv.sstv_decoder import DopplerTracker`` +# continues to work unchanged. +from utils.doppler import DopplerInfo, DopplerTracker # noqa: F401 + +from .constants import ISS_SSTV_FREQ, SAMPLE_RATE from .dsp import goertzel_mag, normalize_audio from .image_decoder import SSTVImageDecoder from .modes import get_mode @@ -42,25 +48,10 @@ except ImportError: # Dataclasses # --------------------------------------------------------------------------- -@dataclass -class DopplerInfo: - """Doppler shift information.""" - frequency_hz: float - shift_hz: float - range_rate_km_s: float - elevation: float - azimuth: float - timestamp: datetime - - def to_dict(self) -> dict: - return { - 'frequency_hz': self.frequency_hz, - 'shift_hz': round(self.shift_hz, 1), - 'range_rate_km_s': round(self.range_rate_km_s, 3), - 'elevation': round(self.elevation, 1), - 'azimuth': round(self.azimuth, 1), - 'timestamp': self.timestamp.isoformat(), - } +# DopplerInfo is now defined in utils/doppler and imported at the top of +# this module. The re-export keeps any code that does +# from utils.sstv.sstv_decoder import DopplerInfo +# working without changes. @dataclass @@ -133,93 +124,8 @@ def _encode_scope_waveform(raw_samples: np.ndarray, window_size: int = 256) -> l return packed.tolist() -# --------------------------------------------------------------------------- -# DopplerTracker -# --------------------------------------------------------------------------- - -class DopplerTracker: - """Real-time Doppler shift calculator for satellite tracking. - - Uses skyfield to calculate the range rate between observer and satellite, - then computes the Doppler-shifted receive frequency. - """ - - def __init__(self, satellite_name: str = 'ISS'): - self._satellite_name = satellite_name - self._observer_lat: float | None = None - self._observer_lon: float | None = None - self._satellite = None - self._observer = None - self._ts = None - self._enabled = False - - def configure(self, latitude: float, longitude: float) -> bool: - """Configure the Doppler tracker with observer location.""" - try: - from skyfield.api import EarthSatellite, load, wgs84 - - from data.satellites import TLE_SATELLITES - - tle_data = TLE_SATELLITES.get(self._satellite_name) - if not tle_data: - logger.error(f"No TLE data for satellite: {self._satellite_name}") - return False - - self._ts = load.timescale() - self._satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], self._ts) - self._observer = wgs84.latlon(latitude, longitude) - self._observer_lat = latitude - self._observer_lon = longitude - self._enabled = True - - logger.info(f"Doppler tracker configured for {self._satellite_name} at ({latitude}, {longitude})") - return True - - except ImportError: - logger.warning("skyfield not available - Doppler tracking disabled") - return False - except Exception as e: - logger.error(f"Failed to configure Doppler tracker: {e}") - return False - - @property - def is_enabled(self) -> bool: - return self._enabled - - def calculate(self, nominal_freq_mhz: float) -> DopplerInfo | None: - """Calculate current Doppler-shifted frequency.""" - if not self._enabled or not self._satellite or not self._observer: - return None - - try: - t = self._ts.now() - difference = self._satellite - self._observer - topocentric = difference.at(t) - alt, az, distance = topocentric.altaz() - - dt_seconds = 1.0 - t_future = self._ts.utc(t.utc_datetime() + timedelta(seconds=dt_seconds)) - topocentric_future = difference.at(t_future) - _, _, distance_future = topocentric_future.altaz() - - range_rate_km_s = (distance_future.km - distance.km) / dt_seconds - nominal_freq_hz = nominal_freq_mhz * 1_000_000 - doppler_factor = 1 - (range_rate_km_s * 1000 / SPEED_OF_LIGHT) - corrected_freq_hz = nominal_freq_hz * doppler_factor - shift_hz = corrected_freq_hz - nominal_freq_hz - - return DopplerInfo( - frequency_hz=corrected_freq_hz, - shift_hz=shift_hz, - range_rate_km_s=range_rate_km_s, - elevation=alt.degrees, - azimuth=az.degrees, - timestamp=datetime.now(timezone.utc) - ) - - except Exception as e: - logger.error(f"Doppler calculation failed: {e}") - return None +# DopplerTracker is now imported from utils/doppler at the top of this module. +# Nothing to define here. # --------------------------------------------------------------------------- diff --git a/utils/weather_sat_predict.py b/utils/weather_sat_predict.py index 8159a0d..d075f5d 100644 --- a/utils/weather_sat_predict.py +++ b/utils/weather_sat_predict.py @@ -1,46 +1,45 @@ """Weather satellite pass prediction utility. -Shared prediction logic used by both the API endpoint and the auto-scheduler. -Delegates to utils.satellite_predict for core pass detection, then enriches -results with weather-satellite-specific metadata. +Self-contained pass prediction for NOAA/Meteor weather satellites. Uses +Skyfield's find_discrete() for AOS/LOS detection, then enriches results +with weather-satellite-specific metadata (name, frequency, mode, quality). """ from __future__ import annotations import datetime -import time from typing import Any +from skyfield.api import EarthSatellite, load, wgs84 +from skyfield.searchlib import find_discrete + +from data.satellites import TLE_SATELLITES from utils.logging import get_logger from utils.weather_sat import WEATHER_SATELLITES logger = get_logger('intercept.weather_sat_predict') -# Cache skyfield timescale to avoid re-downloading/re-parsing per request -_cached_timescale = None +# Live TLE cache — populated by routes/satellite.py at startup. +# Module-level so tests can patch it with patch('utils.weather_sat_predict._tle_cache', ...). +_tle_cache: dict = {} -def _get_timescale(): - global _cached_timescale - if _cached_timescale is None: - from skyfield.api import load - _cached_timescale = load.timescale() - return _cached_timescale +def _format_utc_iso(dt: datetime.datetime) -> str: + """Format a datetime as a UTC ISO 8601 string ending with 'Z'. + + Handles both aware (UTC) and naive (assumed UTC) datetimes, producing a + consistent ``YYYY-MM-DDTHH:MM:SSZ`` string without ``+00:00`` suffixes. + """ + if dt.tzinfo is not None: + dt = dt.astimezone(datetime.timezone.utc).replace(tzinfo=None) + return dt.strftime('%Y-%m-%dT%H:%M:%SZ') def _get_tle_source() -> dict: """Return the best available TLE source (live cache preferred over static data).""" - from data.satellites import TLE_SATELLITES - if not hasattr(_get_tle_source, '_ref') or \ - (time.time() - getattr(_get_tle_source, '_ref_ts', 0)) > 3600: - try: - from routes.satellite import _tle_cache - if _tle_cache: - _get_tle_source._ref = _tle_cache - _get_tle_source._ref_ts = time.time() - except ImportError: - pass - return getattr(_get_tle_source, '_ref', None) or TLE_SATELLITES + if _tle_cache: + return _tle_cache + return TLE_SATELLITES def predict_passes( @@ -58,69 +57,172 @@ def predict_passes( lon: Observer longitude (-180 to 180) hours: Hours ahead to predict (1-72) min_elevation: Minimum peak elevation in degrees (0-90) - include_trajectory: Include az/el trajectory points for polar plot - include_ground_track: Include lat/lon ground track points for map + include_trajectory: Include 30-point az/el trajectory for polar plot + include_ground_track: Include 60-point lat/lon ground track for map Returns: - List of pass dicts sorted by start time, enriched with weather-satellite - fields: id, satellite, name, frequency, mode, quality, riseAz, setAz, - maxElAz, and all standard fields from utils.satellite_predict. + List of pass dicts sorted by start time, each containing: + id, satellite, name, frequency, mode, startTime, startTimeISO, + endTimeISO, maxEl, maxElAz, riseAz, setAz, duration, quality, + and optionally trajectory/groundTrack. """ - from skyfield.api import wgs84 - from utils.satellite_predict import predict_passes as _predict_passes + # Raise ImportError early if skyfield has been disabled (e.g., in tests that + # patch sys.modules to simulate skyfield being unavailable). + import skyfield # noqa: F401 - tle_source = _get_tle_source() - ts = _get_timescale() + ts = load.timescale() observer = wgs84.latlon(lat, lon) t0 = ts.now() t1 = ts.utc(t0.utc_datetime() + datetime.timedelta(hours=hours)) + tle_source = _get_tle_source() all_passes: list[dict[str, Any]] = [] for sat_key, sat_info in WEATHER_SATELLITES.items(): if not sat_info['active']: continue - tle_data = tle_source.get(sat_info['tle_key']) - if not tle_data: + try: + tle_data = tle_source.get(sat_info['tle_key']) + if not tle_data: + continue + + satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], ts) + diff = satellite - observer + + def above_horizon(t, _diff=diff, _el=min_elevation): + alt, _, _ = _diff.at(t).altaz() + return alt.degrees > _el + + above_horizon.rough_period = 0.5 # Approximate orbital period in days + + times, is_rising = find_discrete(t0, t1, above_horizon) + + rise_t = None + for t, rising in zip(times, is_rising): + if rising: + rise_t = t + elif rise_t is not None: + _process_pass( + sat_key, sat_info, satellite, diff, ts, + rise_t, t, min_elevation, + include_trajectory, include_ground_track, + all_passes, + ) + rise_t = None + + except Exception as exc: + logger.debug('Error predicting passes for %s: %s', sat_key, exc) continue - sat_passes = _predict_passes( - tle_data, - observer, - ts, - t0, - t1, - min_el=min_elevation, - include_trajectory=include_trajectory, - include_ground_track=include_ground_track, - ) - - for p in sat_passes: - aos_iso = p['startTimeISO'] - try: - aos_dt = datetime.datetime.fromisoformat(aos_iso) - pass_id = f"{sat_key}_{aos_dt.strftime('%Y%m%d%H%M%S')}" - except Exception: - pass_id = f"{sat_key}_{aos_iso}" - - # Enrich with weather-satellite-specific fields - p['id'] = pass_id - p['satellite'] = sat_key - p['name'] = sat_info['name'] - p['frequency'] = sat_info['frequency'] - p['mode'] = sat_info['mode'] - # Backwards-compatible aliases - p['riseAz'] = p['aosAz'] - p['setAz'] = p['losAz'] - p['maxElAz'] = p['tcaAz'] - p['quality'] = ( - 'excellent' if p['maxEl'] >= 60 - else 'good' if p['maxEl'] >= 30 - else 'fair' - ) - - all_passes.extend(sat_passes) - all_passes.sort(key=lambda p: p['startTimeISO']) return all_passes + + +def _process_pass( + sat_key: str, + sat_info: dict, + satellite, + diff, + ts, + rise_t, + set_t, + min_elevation: float, + include_trajectory: bool, + include_ground_track: bool, + all_passes: list, +) -> None: + """Sample a rise/set interval, build the pass dict, append to all_passes.""" + rise_dt = rise_t.utc_datetime() + set_dt = set_t.utc_datetime() + duration_secs = (set_dt - rise_dt).total_seconds() + + # Sample 30 points across the pass to find max elevation and trajectory + N_TRAJ = 30 + max_el = 0.0 + max_el_az = 0.0 + traj_points = [] + + for i in range(N_TRAJ): + frac = i / (N_TRAJ - 1) if N_TRAJ > 1 else 0.0 + t_pt = ts.tt_jd(rise_t.tt + frac * (set_t.tt - rise_t.tt)) + try: + topo = diff.at(t_pt) + alt, az, _ = topo.altaz() + el = float(alt.degrees) + az_deg = float(az.degrees) + if el > max_el: + max_el = el + max_el_az = az_deg + if include_trajectory: + traj_points.append({'az': round(az_deg, 1), 'el': round(max(0.0, el), 1)}) + except Exception: + pass + + # Filter passes that never reach min_elevation + if max_el < min_elevation: + return + + # AOS and LOS azimuths + try: + rise_az = float(diff.at(rise_t).altaz()[1].degrees) + except Exception: + rise_az = 0.0 + + try: + set_az = float(diff.at(set_t).altaz()[1].degrees) + except Exception: + set_az = 0.0 + + aos_iso = _format_utc_iso(rise_dt) + try: + pass_id = f"{sat_key}_{rise_dt.strftime('%Y%m%d%H%M%S')}" + except Exception: + pass_id = f"{sat_key}_{aos_iso}" + + pass_dict: dict[str, Any] = { + 'id': pass_id, + 'satellite': sat_key, + 'name': sat_info['name'], + 'frequency': sat_info['frequency'], + 'mode': sat_info['mode'], + 'startTime': rise_dt.strftime('%Y-%m-%d %H:%M UTC'), + 'startTimeISO': aos_iso, + 'endTimeISO': _format_utc_iso(set_dt), + 'maxEl': round(max_el, 1), + 'maxElAz': round(max_el_az, 1), + 'riseAz': round(rise_az, 1), + 'setAz': round(set_az, 1), + 'duration': round(duration_secs, 1), + 'quality': ( + 'excellent' if max_el >= 60 + else 'good' if max_el >= 30 + else 'fair' + ), + # Backwards-compatible aliases used by weather_sat_scheduler and the frontend + 'aosAz': round(rise_az, 1), + 'losAz': round(set_az, 1), + 'tcaAz': round(max_el_az, 1), + } + + if include_trajectory: + pass_dict['trajectory'] = traj_points + + if include_ground_track: + ground_track = [] + N_TRACK = 60 + for i in range(N_TRACK): + frac = i / (N_TRACK - 1) if N_TRACK > 1 else 0.0 + t_pt = ts.tt_jd(rise_t.tt + frac * (set_t.tt - rise_t.tt)) + try: + geocentric = satellite.at(t_pt) + subpoint = wgs84.subpoint(geocentric) + ground_track.append({ + 'lat': round(float(subpoint.latitude.degrees), 4), + 'lon': round(float(subpoint.longitude.degrees), 4), + }) + except Exception: + pass + pass_dict['groundTrack'] = ground_track + + all_passes.append(pass_dict)