diff --git a/routes/satellite.py b/routes/satellite.py index 1ce501d..1693164 100644 --- a/routes/satellite.py +++ b/routes/satellite.py @@ -25,7 +25,7 @@ from utils.responses import api_error from utils.sse import sse_stream_fanout from utils.validation import validate_elevation, validate_hours, validate_latitude, validate_longitude -satellite_bp = Blueprint('satellite', __name__, url_prefix='/satellite') +satellite_bp = Blueprint("satellite", __name__, url_prefix="/satellite") # Cache skyfield timescale to avoid re-downloading/re-parsing per request _cached_timescale = None @@ -35,15 +35,17 @@ def _get_timescale(): global _cached_timescale if _cached_timescale is None: from skyfield.api import load + # Use bundled timescale data so the first request does not block on network I/O. _cached_timescale = load.timescale(builtin=True) return _cached_timescale + # Maximum response size for external requests (1MB) MAX_RESPONSE_SIZE = 1024 * 1024 # Allowed hosts for TLE fetching -ALLOWED_TLE_HOSTS = ['celestrak.org', 'celestrak.com', 'www.celestrak.org', 'www.celestrak.com'] +ALLOWED_TLE_HOSTS = ["celestrak.org", "celestrak.com", "www.celestrak.org", "www.celestrak.com"] # Local TLE cache (can be updated via API) _tle_cache = dict(TLE_SATELLITES) @@ -56,16 +58,16 @@ _TRACK_CACHE_TTL = 1800 # Thread pool for background ground-track computation (non-blocking from 1Hz tracker loop) from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor -_track_executor = _ThreadPoolExecutor(max_workers=2, thread_name_prefix='sat-track') +_track_executor = _ThreadPoolExecutor(max_workers=2, thread_name_prefix="sat-track") _track_in_progress: set = set() # cache keys currently being computed _pass_cache: dict = {} _PASS_CACHE_TTL = 300 _BUILTIN_NORAD_TO_KEY = { - 25544: 'ISS', - 40069: 'METEOR-M2', - 57166: 'METEOR-M2-3', - 59051: 'METEOR-M2-4', + 25544: "ISS", + 40069: "METEOR-M2", + 57166: "METEOR-M2-3", + 59051: "METEOR-M2-4", } @@ -76,11 +78,11 @@ def _load_db_satellites_into_cache(): db_sats = get_tracked_satellites() loaded = 0 for sat in db_sats: - if sat['tle_line1'] and sat['tle_line2']: + if sat["tle_line1"] and sat["tle_line2"]: # Use a cache key derived from name (sanitised) - cache_key = sat['name'].replace(' ', '-').upper() + cache_key = sat["name"].replace(" ", "-").upper() if cache_key not in _tle_cache: - _tle_cache[cache_key] = (sat['name'], sat['tle_line1'], sat['tle_line2']) + _tle_cache[cache_key] = (sat["name"], sat["tle_line1"], sat["tle_line2"]) loaded += 1 if loaded: logger.info(f"Loaded {loaded} user-tracked satellites into TLE cache") @@ -88,9 +90,14 @@ def _load_db_satellites_into_cache(): logger.warning(f"Failed to load DB satellites into TLE cache: {e}") +def get_cached_tle(name: str) -> tuple[str, str, str] | None: + """Return (name, line1, line2) from the live TLE cache, or None if not found.""" + return _tle_cache.get(name) + + def _normalize_satellite_name(value: object) -> str: """Normalize satellite identifiers for loose name matching.""" - return str(value or '').strip().replace(' ', '-').upper() + return str(value or "").strip().replace(" ", "-").upper() def _get_tracked_satellite_maps() -> tuple[dict[int, dict], dict[str, dict]]: @@ -100,17 +107,19 @@ def _get_tracked_satellite_maps() -> tuple[dict[int, dict], dict[str, dict]]: try: for sat in get_tracked_satellites(): try: - norad_id = int(sat['norad_id']) + norad_id = int(sat["norad_id"]) except (TypeError, ValueError): continue by_norad[norad_id] = sat - by_name[_normalize_satellite_name(sat.get('name'))] = sat + by_name[_normalize_satellite_name(sat.get("name"))] = sat except Exception as e: logger.warning(f"Failed to read tracked satellites for lookup: {e}") return by_norad, by_name -def _resolve_satellite_request(sat: object, tracked_by_norad: dict[int, dict], tracked_by_name: dict[str, dict]) -> tuple[str, int | None, tuple[str, str, str] | None]: +def _resolve_satellite_request( + sat: object, tracked_by_norad: dict[int, dict], tracked_by_name: dict[str, dict] +) -> tuple[str, int | None, tuple[str, str, str] | None]: """Resolve a satellite request to display name, NORAD ID, and TLE data.""" norad_id: int | None = None sat_key: str | None = None @@ -127,29 +136,33 @@ def _resolve_satellite_request(sat: object, tracked_by_norad: dict[int, dict], t if norad_id is not None: tracked = tracked_by_norad.get(norad_id) - sat_key = _BUILTIN_NORAD_TO_KEY.get(norad_id) or (tracked.get('name') if tracked else str(norad_id)) + sat_key = _BUILTIN_NORAD_TO_KEY.get(norad_id) or (tracked.get("name") if tracked else str(norad_id)) else: normalized = _normalize_satellite_name(sat_key) tracked = tracked_by_name.get(normalized) if tracked: try: - norad_id = int(tracked['norad_id']) + norad_id = int(tracked["norad_id"]) except (TypeError, ValueError): norad_id = None - sat_key = tracked.get('name') or sat_key + sat_key = tracked.get("name") or sat_key tle_data = None candidate_keys: list[str] = [] if sat_key: - candidate_keys.extend([ - sat_key, - _normalize_satellite_name(sat_key), - ]) - if tracked and tracked.get('name'): - candidate_keys.extend([ - tracked['name'], - _normalize_satellite_name(tracked['name']), - ]) + candidate_keys.extend( + [ + sat_key, + _normalize_satellite_name(sat_key), + ] + ) + if tracked and tracked.get("name"): + candidate_keys.extend( + [ + tracked["name"], + _normalize_satellite_name(tracked["name"]), + ] + ) seen: set[str] = set() for key in candidate_keys: @@ -164,9 +177,9 @@ def _resolve_satellite_request(sat: object, tracked_by_norad: dict[int, dict], t tle_data = _tle_cache[norm] break - if tle_data is None and tracked and tracked.get('tle_line1') and tracked.get('tle_line2'): - display_name = tracked.get('name') or sat_key or str(norad_id or 'UNKNOWN') - tle_data = (display_name, tracked['tle_line1'], tracked['tle_line2']) + if tle_data is None and tracked and tracked.get("tle_line1") and tracked.get("tle_line2"): + display_name = tracked.get("name") or sat_key or str(norad_id or "UNKNOWN") + tle_data = (display_name, tracked["tle_line1"], tracked["tle_line2"]) _tle_cache[_normalize_satellite_name(display_name)] = tle_data if tle_data is None and sat_key: @@ -178,7 +191,11 @@ def _resolve_satellite_request(sat: object, tracked_by_norad: dict[int, dict], t display_name = _BUILTIN_NORAD_TO_KEY.get(norad_id or -1) if not display_name: - display_name = (tracked.get('name') if tracked else None) or (tle_data[0] if tle_data else None) or (sat_key if sat_key else str(norad_id or 'UNKNOWN')) + display_name = ( + (tracked.get("name") if tracked else None) + or (tle_data[0] if tle_data else None) + or (sat_key if sat_key else str(norad_id or "UNKNOWN")) + ) return display_name, norad_id, tle_data @@ -229,10 +246,10 @@ def _start_satellite_tracker(): positions = [] for sat_rec in tracked: - sat_name = sat_rec['name'] - norad_id = sat_rec.get('norad_id', 0) - tle1 = sat_rec.get('tle_line1') - tle2 = sat_rec.get('tle_line2') + sat_name = sat_rec["name"] + norad_id = sat_rec.get("norad_id", 0) + tle1 = sat_rec.get("tle_line1") + tle2 = sat_rec.get("tle_line2") if not tle1 or not tle2: # Fall back to TLE cache. Try the builtin NORAD-ID key first # (e.g. 'ISS'), then the name-derived key as a last resort. @@ -241,7 +258,11 @@ def _start_satellite_tracker(): except (TypeError, ValueError): norad_int = 0 builtin_key = _BUILTIN_NORAD_TO_KEY.get(norad_int) - cache_key = builtin_key if (builtin_key and builtin_key in _tle_cache) else sat_name.replace(' ', '-').upper() + cache_key = ( + builtin_key + if (builtin_key and builtin_key in _tle_cache) + else sat_name.replace(" ", "-").upper() + ) if cache_key not in _tle_cache: continue tle_entry = _tle_cache[cache_key] @@ -258,11 +279,11 @@ def _start_satellite_tracker(): # visible) are intentionally omitted here — the per-client HTTP poll # at /satellite/position owns those using the client's actual location. pos = { - 'satellite': sat_name, - 'norad_id': norad_id, - 'lat': float(subpoint.latitude.degrees), - 'lon': float(subpoint.longitude.degrees), - 'altitude': float(subpoint.elevation.km), + "satellite": sat_name, + "norad_id": norad_id, + "lat": float(subpoint.latitude.degrees), + "lon": float(subpoint.longitude.degrees), + "altitude": float(subpoint.elevation.km), } # Ground track with caching (90 points, TTL 1800s). @@ -272,7 +293,7 @@ def _start_satellite_tracker(): cache_key_track = (sat_name, tle1[:20]) cached = _track_cache.get(cache_key_track) if cached and (time.time() - cached[1]) < _TRACK_CACHE_TTL: - pos['groundTrack'] = cached[0] + pos["groundTrack"] = cached[0] elif cache_key_track not in _track_in_progress: _track_in_progress.add(cache_key_track) _sat_ref = satellite @@ -287,11 +308,13 @@ def _start_satellite_tracker(): try: geo = _sat.at(t_point) sp = wgs84.subpoint(geo) - track.append({ - 'lat': float(sp.latitude.degrees), - 'lon': float(sp.longitude.degrees), - 'past': minutes_offset < 0, - }) + track.append( + { + "lat": float(sp.latitude.degrees), + "lon": float(sp.longitude.degrees), + "past": minutes_offset < 0, + } + ) except Exception: continue _track_cache[_key] = (track, time.time()) @@ -309,9 +332,9 @@ def _start_satellite_tracker(): if positions: msg = { - 'type': 'positions', - 'positions': positions, - 'timestamp': datetime.utcnow().isoformat(), + "type": "positions", + "positions": positions, + "timestamp": datetime.utcnow().isoformat(), } try: app_module.satellite_queue.put_nowait(msg) @@ -329,6 +352,7 @@ _TLE_REFRESH_INTERVAL_SECONDS = 24 * 60 * 60 # 24 hours def init_tle_auto_refresh(): """Initialize TLE auto-refresh. Called by app.py after initialization.""" + def _schedule_next_tle_refresh(delay: float = _TLE_REFRESH_INTERVAL_SECONDS) -> None: t = threading.Timer(delay, _auto_refresh_tle) t.daemon = True @@ -354,7 +378,7 @@ def init_tle_auto_refresh(): tracker_thread = threading.Thread( target=_start_satellite_tracker, daemon=True, - name='satellite-tracker', + name="satellite-tracker", ) tracker_thread.start() logger.info("Satellite tracker thread launched") @@ -373,26 +397,26 @@ def _fetch_iss_realtime(observer_lat: float | None = None, observer_lon: float | # Try primary API: Where The ISS At try: - response = requests.get('https://api.wheretheiss.at/v1/satellites/25544', timeout=5) + response = requests.get("https://api.wheretheiss.at/v1/satellites/25544", timeout=5) if response.status_code == 200: data = response.json() - iss_lat = float(data['latitude']) - iss_lon = float(data['longitude']) - iss_alt = float(data.get('altitude', 420)) - source = 'wheretheiss' + iss_lat = float(data["latitude"]) + iss_lon = float(data["longitude"]) + iss_alt = float(data.get("altitude", 420)) + source = "wheretheiss" except Exception as e: logger.debug(f"Where The ISS At API failed: {e}") # Try fallback API: Open Notify if iss_lat is None: try: - response = requests.get('http://api.open-notify.org/iss-now.json', timeout=5) + response = requests.get("http://api.open-notify.org/iss-now.json", timeout=5) if response.status_code == 200: data = response.json() - if data.get('message') == 'success': - iss_lat = float(data['iss_position']['latitude']) - iss_lon = float(data['iss_position']['longitude']) - source = 'open-notify' + if data.get("message") == "success": + iss_lat = float(data["iss_position"]["latitude"]) + iss_lon = float(data["iss_position"]["longitude"]) + source = "open-notify" except Exception as e: logger.debug(f"Open Notify API failed: {e}") @@ -400,12 +424,12 @@ def _fetch_iss_realtime(observer_lat: float | None = None, observer_lon: float | return None result = { - 'satellite': 'ISS', - 'norad_id': 25544, - 'lat': iss_lat, - 'lon': iss_lon, - 'altitude': iss_alt, - 'source': source + "satellite": "ISS", + "norad_id": 25544, + "lat": iss_lat, + "lon": iss_lon, + "altitude": iss_alt, + "source": source, } # Calculate observer-relative data if location provided @@ -422,7 +446,7 @@ def _fetch_iss_realtime(observer_lat: float | None = None, observer_lon: float | # Haversine for ground distance dlat = lat2 - lat1 dlon = lon2 - lon1 - a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 + a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 c = 2 * math.asin(math.sqrt(a)) ground_distance = earth_radius * c @@ -441,39 +465,38 @@ def _fetch_iss_realtime(observer_lat: float | None = None, observer_lon: float | azimuth = math.degrees(math.atan2(y, x)) azimuth = (azimuth + 360) % 360 - result['elevation'] = round(elevation, 1) - result['azimuth'] = round(azimuth, 1) - result['distance'] = round(slant_range, 1) - result['visible'] = elevation > 0 + result["elevation"] = round(elevation, 1) + result["azimuth"] = round(azimuth, 1) + result["distance"] = round(slant_range, 1) + result["visible"] = elevation > 0 return result -@satellite_bp.route('/dashboard') +@satellite_bp.route("/dashboard") def satellite_dashboard(): """Popout satellite tracking dashboard.""" - embedded = request.args.get('embedded', 'false') == 'true' - response = make_response(render_template( - 'satellite_dashboard.html', - shared_observer_location=SHARED_OBSERVER_LOCATION_ENABLED, - embedded=embedded, - )) - response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' - response.headers['Pragma'] = 'no-cache' - response.headers['Expires'] = '0' + embedded = request.args.get("embedded", "false") == "true" + response = make_response( + render_template( + "satellite_dashboard.html", + shared_observer_location=SHARED_OBSERVER_LOCATION_ENABLED, + embedded=embedded, + ) + ) + response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" + response.headers["Pragma"] = "no-cache" + response.headers["Expires"] = "0" return response -@satellite_bp.route('/predict', methods=['POST']) +@satellite_bp.route("/predict", methods=["POST"]) def predict_passes(): """Calculate satellite passes using skyfield.""" try: from skyfield.api import EarthSatellite, wgs84 except ImportError: - return jsonify({ - 'status': 'error', - 'message': 'skyfield library not installed. Run: pip install skyfield' - }), 503 + return jsonify({"status": "error", "message": "skyfield library not installed. Run: pip install skyfield"}), 503 from utils.satellite_predict import predict_passes as _predict_passes @@ -481,21 +504,21 @@ def predict_passes(): try: # Validate inputs - lat = validate_latitude(data.get('latitude', data.get('lat', 51.5074))) - lon = validate_longitude(data.get('longitude', data.get('lon', -0.1278))) - hours = validate_hours(data.get('hours', 24)) - min_el = validate_elevation(data.get('minEl', 10)) + lat = validate_latitude(data.get("latitude", data.get("lat", 51.5074))) + lon = validate_longitude(data.get("longitude", data.get("lon", -0.1278))) + hours = validate_hours(data.get("hours", 24)) + min_el = validate_elevation(data.get("minEl", 10)) except ValueError as e: return api_error(str(e), 400) try: - sat_input = data.get('satellites', ['ISS', 'METEOR-M2-3', 'METEOR-M2-4']) + sat_input = data.get("satellites", ["ISS", "METEOR-M2-3", "METEOR-M2-4"]) passes = [] colors = { - 'ISS': '#00ffff', - 'METEOR-M2': '#9370DB', - 'METEOR-M2-3': '#ff00ff', - 'METEOR-M2-4': '#00ff88', + "ISS": "#00ffff", + "METEOR-M2": "#9370DB", + "METEOR-M2-3": "#ff00ff", + "METEOR-M2-4": "#00ff88", } tracked_by_norad, tracked_by_name = _get_tracked_satellite_maps() @@ -511,21 +534,25 @@ def predict_passes(): resolved_satellites.append((sat_name, norad_id or 0, tle_data)) if not resolved_satellites: - return jsonify({ - 'status': 'success', - 'passes': [], - 'cached': False, - }) + return jsonify( + { + "status": "success", + "passes": [], + "cached": False, + } + ) cache_key = _make_pass_cache_key(lat, lon, hours, min_el, resolved_satellites) cached = _pass_cache.get(cache_key) now_ts = time.time() if cached and (now_ts - cached[1]) < _PASS_CACHE_TTL: - return jsonify({ - 'status': 'success', - 'passes': cached[0], - 'cached': True, - }) + return jsonify( + { + "status": "success", + "passes": cached[0], + "cached": True, + } + ) ts = _get_timescale() observer = wgs84.latlon(lat, lon) @@ -539,19 +566,19 @@ def predict_passes(): geo = satellite.at(t0) sp = wgs84.subpoint(geo) current_pos = { - 'lat': float(sp.latitude.degrees), - 'lon': float(sp.longitude.degrees), - 'altitude': float(sp.elevation.km), + "lat": float(sp.latitude.degrees), + "lon": float(sp.longitude.degrees), + "altitude": float(sp.elevation.km), } # Add observer-relative data using the request's observer location try: diff = satellite - observer topo = diff.at(t0) alt_deg, az_deg, dist_km = topo.altaz() - current_pos['elevation'] = round(float(alt_deg.degrees), 1) - current_pos['azimuth'] = round(float(az_deg.degrees), 1) - current_pos['distance'] = round(float(dist_km.km), 1) - current_pos['visible'] = bool(alt_deg.degrees > 0) + current_pos["elevation"] = round(float(alt_deg.degrees), 1) + current_pos["azimuth"] = round(float(az_deg.degrees), 1) + current_pos["distance"] = round(float(dist_km.km), 1) + current_pos["visible"] = bool(alt_deg.degrees > 0) except Exception: pass except Exception: @@ -559,59 +586,63 @@ def predict_passes(): sat_passes = _predict_passes(tle_data, observer, ts, t0, t1, min_el=min_el) for p in sat_passes: - p['satellite'] = sat_name - p['norad'] = norad_id - p['color'] = colors.get(sat_name, '#00ff00') + p["satellite"] = sat_name + p["norad"] = norad_id + p["color"] = colors.get(sat_name, "#00ff00") if current_pos: - p['currentPos'] = current_pos + p["currentPos"] = current_pos passes.extend(sat_passes) - passes.sort(key=lambda p: p['startTimeISO']) + passes.sort(key=lambda p: p["startTimeISO"]) # Only cache non-empty results to avoid serving a stale empty response # on the next request (which could happen if TLEs were too old to produce # any events — the auto-refresh will update them shortly after startup). if passes: _pass_cache[cache_key] = (passes, now_ts) - return jsonify({ - 'status': 'success', - 'passes': passes, - 'cached': False, - }) + return jsonify( + { + "status": "success", + "passes": passes, + "cached": False, + } + ) except Exception as exc: - logger.exception('Satellite pass calculation failed') - if 'cache_key' in locals(): + logger.exception("Satellite pass calculation failed") + if "cache_key" in locals(): stale_cached = _pass_cache.get(cache_key) if stale_cached and stale_cached[0]: - return jsonify({ - 'status': 'success', - 'passes': stale_cached[0], - 'cached': True, - 'stale': True, - }) - return api_error(f'Failed to calculate passes: {exc}', 500) + return jsonify( + { + "status": "success", + "passes": stale_cached[0], + "cached": True, + "stale": True, + } + ) + return api_error(f"Failed to calculate passes: {exc}", 500) -@satellite_bp.route('/position', methods=['POST']) +@satellite_bp.route("/position", methods=["POST"]) def get_satellite_position(): """Get real-time positions of satellites.""" try: from skyfield.api import EarthSatellite, wgs84 except ImportError: - return api_error('skyfield not installed', 503) + return api_error("skyfield not installed", 503) data = request.json or {} # Validate inputs try: - lat = validate_latitude(data.get('latitude', data.get('lat', 51.5074))) - lon = validate_longitude(data.get('longitude', data.get('lon', -0.1278))) + lat = validate_latitude(data.get("latitude", data.get("lat", 51.5074))) + lon = validate_longitude(data.get("longitude", data.get("lon", -0.1278))) except ValueError as e: return api_error(str(e), 400) - sat_input = data.get('satellites', []) - include_track = bool(data.get('includeTrack', True)) - prefer_realtime_api = bool(data.get('preferRealtimeApi', False)) + sat_input = data.get("satellites", []) + include_track = bool(data.get("includeTrack", True)) + prefer_realtime_api = bool(data.get("preferRealtimeApi", False)) observer = wgs84.latlon(lat, lon) ts = None @@ -625,17 +656,17 @@ def get_satellite_position(): sat_name, norad_id, tle_data = _resolve_satellite_request(sat, tracked_by_norad, tracked_by_name) # Optional special handling for ISS. The dashboard does not enable this # because external API latency can make live updates stall. - if prefer_realtime_api and (norad_id == 25544 or sat_name == 'ISS'): + if prefer_realtime_api and (norad_id == 25544 or sat_name == "ISS"): iss_data = _fetch_iss_realtime(lat, lon) if iss_data: # Add orbit track if requested (using TLE for track prediction) - if include_track and 'ISS' in _tle_cache: + if include_track and "ISS" in _tle_cache: try: if ts is None: ts = _get_timescale() now = ts.now() now_dt = now.utc_datetime() - tle_data = _tle_cache['ISS'] + tle_data = _tle_cache["ISS"] satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], ts) orbit_track = [] for minutes_offset in range(-45, 46, 1): @@ -643,14 +674,16 @@ def get_satellite_position(): try: geo = satellite.at(t_point) sp = wgs84.subpoint(geo) - orbit_track.append({ - 'lat': float(sp.latitude.degrees), - 'lon': float(sp.longitude.degrees), - 'past': minutes_offset < 0 - }) + orbit_track.append( + { + "lat": float(sp.latitude.degrees), + "lon": float(sp.longitude.degrees), + "past": minutes_offset < 0, + } + ) except Exception: continue - iss_data['track'] = orbit_track + iss_data["track"] = orbit_track except Exception: pass positions.append(iss_data) @@ -675,15 +708,15 @@ def get_satellite_position(): alt, az, distance = topocentric.altaz() pos_data = { - 'satellite': sat_name, - 'norad_id': norad_id, - 'lat': float(subpoint.latitude.degrees), - 'lon': float(subpoint.longitude.degrees), - 'altitude': float(subpoint.elevation.km), - 'elevation': float(alt.degrees), - 'azimuth': float(az.degrees), - 'distance': float(distance.km), - 'visible': bool(alt.degrees > 0) + "satellite": sat_name, + "norad_id": norad_id, + "lat": float(subpoint.latitude.degrees), + "lon": float(subpoint.longitude.degrees), + "altitude": float(subpoint.elevation.km), + "elevation": float(alt.degrees), + "azimuth": float(az.degrees), + "distance": float(distance.km), + "visible": bool(alt.degrees > 0), } if include_track: @@ -693,52 +726,52 @@ def get_satellite_position(): try: geo = satellite.at(t_point) sp = wgs84.subpoint(geo) - orbit_track.append({ - 'lat': float(sp.latitude.degrees), - 'lon': float(sp.longitude.degrees), - 'past': minutes_offset < 0 - }) + orbit_track.append( + { + "lat": float(sp.latitude.degrees), + "lon": float(sp.longitude.degrees), + "past": minutes_offset < 0, + } + ) except Exception: continue - pos_data['track'] = orbit_track - pos_data['groundTrack'] = orbit_track + pos_data["track"] = orbit_track + pos_data["groundTrack"] = orbit_track positions.append(pos_data) except Exception: continue - return jsonify({ - 'status': 'success', - 'positions': positions, - 'timestamp': datetime.utcnow().isoformat() - }) + return jsonify({"status": "success", "positions": positions, "timestamp": datetime.utcnow().isoformat()}) -@satellite_bp.route('/transmitters/') +@satellite_bp.route("/transmitters/") def get_transmitters_endpoint(norad_id: int): """Return SatNOGS transmitter data for a satellite by NORAD ID.""" from utils.satnogs import get_transmitters + transmitters = get_transmitters(norad_id) - return jsonify({'status': 'success', 'norad_id': norad_id, 'transmitters': transmitters}) + return jsonify({"status": "success", "norad_id": norad_id, "transmitters": transmitters}) -@satellite_bp.route('/parse-packet', methods=['POST']) +@satellite_bp.route("/parse-packet", methods=["POST"]) def parse_packet(): """Parse a raw satellite telemetry packet (base64-encoded).""" import base64 from utils.satellite_telemetry import auto_parse + data = request.json or {} try: - raw_bytes = base64.b64decode(data.get('data', '')) + raw_bytes = base64.b64decode(data.get("data", "")) except Exception: - return api_error('Invalid base64 data', 400) + return api_error("Invalid base64 data", 400) result = auto_parse(raw_bytes) - return jsonify({'status': 'success', 'parsed': result}) + return jsonify({"status": "success", "parsed": result}) -@satellite_bp.route('/stream_satellite') +@satellite_bp.route("/stream_satellite") def stream_satellite() -> Response: """SSE endpoint streaming live satellite positions from the background tracker.""" import app as app_module @@ -746,15 +779,15 @@ def stream_satellite() -> Response: response = Response( sse_stream_fanout( source_queue=app_module.satellite_queue, - channel_key='satellite', + channel_key="satellite", timeout=1.0, keepalive_interval=30.0, ), - mimetype='text/event-stream', + mimetype="text/event-stream", ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - response.headers['Connection'] = 'keep-alive' + response.headers["Cache-Control"] = "no-cache" + response.headers["X-Accel-Buffering"] = "no" + response.headers["Connection"] = "keep-alive" return response @@ -768,25 +801,25 @@ def refresh_tle_data() -> list: global _tle_cache name_mappings = { - 'ISS (ZARYA)': 'ISS', - 'NOAA 15': 'NOAA-15', - 'NOAA 18': 'NOAA-18', - 'NOAA 19': 'NOAA-19', - 'NOAA 20 (JPSS-1)': 'NOAA-20', - 'NOAA 21 (JPSS-2)': 'NOAA-21', - 'METEOR-M 2': 'METEOR-M2', - 'METEOR-M2 3': 'METEOR-M2-3', - 'METEOR-M2 4': 'METEOR-M2-4' + "ISS (ZARYA)": "ISS", + "NOAA 15": "NOAA-15", + "NOAA 18": "NOAA-18", + "NOAA 19": "NOAA-19", + "NOAA 20 (JPSS-1)": "NOAA-20", + "NOAA 21 (JPSS-2)": "NOAA-21", + "METEOR-M 2": "METEOR-M2", + "METEOR-M2 3": "METEOR-M2-3", + "METEOR-M2 4": "METEOR-M2-4", } updated = [] - for group in ['stations', 'weather', 'noaa']: - url = f'https://celestrak.org/NORAD/elements/gp.php?GROUP={group}&FORMAT=tle' + for group in ["stations", "weather", "noaa"]: + url = f"https://celestrak.org/NORAD/elements/gp.php?GROUP={group}&FORMAT=tle" try: with urllib.request.urlopen(url, timeout=15) as response: - content = response.read().decode('utf-8') - lines = content.strip().split('\n') + content = response.read().decode("utf-8") + lines = content.strip().split("\n") i = 0 while i + 2 < len(lines): @@ -794,7 +827,7 @@ def refresh_tle_data() -> list: line1 = lines[i + 1].strip() line2 = lines[i + 2].strip() - if not (line1.startswith('1 ') and line2.startswith('2 ')): + if not (line1.startswith("1 ") and line2.startswith("2 ")): i += 1 continue @@ -810,43 +843,87 @@ def refresh_tle_data() -> list: logger.warning(f"Error fetching TLE group {group}: {e}") continue + if updated: + _persist_tle_cache() + return updated -@satellite_bp.route('/update-tle', methods=['POST']) +def _persist_tle_cache() -> None: + """Write updated TLE data back to data/satellites.py so restarts don't reload stale values.""" + import os + + satellites_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "satellites.py") + try: + lines = [ + "# TLE data for satellite tracking (updated periodically)\n", + '# To update: click "Update TLE" in satellite dashboard or SSTV mode\n', + "# Data source: CelesTrak (celestrak.org)\n", + "TLE_SATELLITES = {\n", + ] + for key, val in _tle_cache.items(): + name, line1, line2 = val + escaped_name = name.replace("'", "\\'") + escaped_key = key.replace("'", "\\'") + lines.append(f" '{escaped_key}': ('{escaped_name}',\n") + lines.append(f" '{line1}',\n") + lines.append(f" '{line2}'),\n") + lines.append("}\n") + with open(satellites_path, "w") as f: + f.writelines(lines) + logger.info(f"Persisted {len(_tle_cache)} TLE entries to data/satellites.py") + except Exception as e: + logger.warning(f"Failed to persist TLE cache to disk: {e}") + + +@satellite_bp.route("/update-tle", methods=["POST"]) def update_tle(): """Update TLE data from CelesTrak (API endpoint).""" try: updated = refresh_tle_data() - return jsonify({ - 'status': 'success', - 'updated': updated - }) + return jsonify({"status": "success", "updated": updated}) except Exception as e: logger.error(f"Error updating TLE data: {e}") - return api_error('TLE update failed') + return api_error("TLE update failed") -@satellite_bp.route('/celestrak/') +@satellite_bp.route("/celestrak/") def fetch_celestrak(category): """Fetch TLE data from CelesTrak for a category.""" valid_categories = [ - 'stations', 'weather', 'noaa', 'goes', 'resource', 'sarsat', - 'dmc', 'tdrss', 'argos', 'planet', 'spire', 'geo', 'intelsat', - 'ses', 'iridium', 'iridium-NEXT', 'starlink', 'oneweb', - 'amateur', 'cubesat', 'visual' + "stations", + "weather", + "noaa", + "goes", + "resource", + "sarsat", + "dmc", + "tdrss", + "argos", + "planet", + "spire", + "geo", + "intelsat", + "ses", + "iridium", + "iridium-NEXT", + "starlink", + "oneweb", + "amateur", + "cubesat", + "visual", ] if category not in valid_categories: - return api_error(f'Invalid category. Valid: {valid_categories}') + return api_error(f"Invalid category. Valid: {valid_categories}") try: - url = f'https://celestrak.org/NORAD/elements/gp.php?GROUP={category}&FORMAT=tle' + url = f"https://celestrak.org/NORAD/elements/gp.php?GROUP={category}&FORMAT=tle" with urllib.request.urlopen(url, timeout=10) as response: - content = response.read().decode('utf-8') + content = response.read().decode("utf-8") satellites = [] - lines = content.strip().split('\n') + lines = content.strip().split("\n") i = 0 while i + 2 < len(lines): @@ -854,128 +931,126 @@ def fetch_celestrak(category): line1 = lines[i + 1].strip() line2 = lines[i + 2].strip() - if not (line1.startswith('1 ') and line2.startswith('2 ')): + if not (line1.startswith("1 ") and line2.startswith("2 ")): i += 1 continue try: norad_id = int(line1[2:7]) - satellites.append({ - 'name': name, - 'norad': norad_id, - 'tle1': line1, - 'tle2': line2 - }) + satellites.append({"name": name, "norad": norad_id, "tle1": line1, "tle2": line2}) except (ValueError, IndexError): pass i += 3 - return jsonify({ - 'status': 'success', - 'category': category, - 'satellites': satellites - }) + return jsonify({"status": "success", "category": category, "satellites": satellites}) except Exception as e: logger.error(f"Error fetching CelesTrak data: {e}") - return api_error('Failed to fetch satellite data') + return api_error("Failed to fetch satellite data") # ============================================================================= # Tracked Satellites CRUD # ============================================================================= -@satellite_bp.route('/tracked', methods=['GET']) + +@satellite_bp.route("/tracked", methods=["GET"]) def list_tracked_satellites(): """Return all tracked satellites from the database.""" - enabled_only = request.args.get('enabled', '').lower() == 'true' + enabled_only = request.args.get("enabled", "").lower() == "true" sats = get_tracked_satellites(enabled_only=enabled_only) - return jsonify({'status': 'success', 'satellites': sats}) + return jsonify({"status": "success", "satellites": sats}) -@satellite_bp.route('/tracked', methods=['POST']) +@satellite_bp.route("/tracked", methods=["POST"]) def add_tracked_satellites_endpoint(): """Add one or more tracked satellites.""" global _tle_cache data = request.get_json(silent=True) if not data: - return api_error('No data provided', 400) + return api_error("No data provided", 400) # Accept a single satellite dict or a list sat_list = data if isinstance(data, list) else [data] normalized: list[dict] = [] for sat in sat_list: - norad_id = str(sat.get('norad_id', sat.get('norad', ''))) - name = sat.get('name', '') + norad_id = str(sat.get("norad_id", sat.get("norad", ""))) + name = sat.get("name", "") if not norad_id or not name: continue - tle1 = sat.get('tle_line1', sat.get('tle1')) - tle2 = sat.get('tle_line2', sat.get('tle2')) - enabled = sat.get('enabled', True) + tle1 = sat.get("tle_line1", sat.get("tle1")) + tle2 = sat.get("tle_line2", sat.get("tle2")) + enabled = sat.get("enabled", True) - normalized.append({ - 'norad_id': norad_id, - 'name': name, - 'tle_line1': tle1, - 'tle_line2': tle2, - 'enabled': bool(enabled), - 'builtin': False, - }) + normalized.append( + { + "norad_id": norad_id, + "name": name, + "tle_line1": tle1, + "tle_line2": tle2, + "enabled": bool(enabled), + "builtin": False, + } + ) # Also inject into TLE cache if we have TLE data if tle1 and tle2: - cache_key = name.replace(' ', '-').upper() + cache_key = name.replace(" ", "-").upper() _tle_cache[cache_key] = (name, tle1, tle2) # Single inserts preserve previous behavior; list inserts use DB-level bulk path. if len(normalized) == 1: sat = normalized[0] - added = 1 if add_tracked_satellite( - sat['norad_id'], - sat['name'], - sat.get('tle_line1'), - sat.get('tle_line2'), - sat.get('enabled', True), - sat.get('builtin', False), - ) else 0 + added = ( + 1 + if add_tracked_satellite( + sat["norad_id"], + sat["name"], + sat.get("tle_line1"), + sat.get("tle_line2"), + sat.get("enabled", True), + sat.get("builtin", False), + ) + else 0 + ) else: added = bulk_add_tracked_satellites(normalized) response_payload = { - 'status': 'success', - 'added': added, - 'processed': len(normalized), + "status": "success", + "added": added, + "processed": len(normalized), } # Returning all tracked satellites for very large imports can stall the UI. - include_satellites = request.args.get('include_satellites', '').lower() == 'true' + include_satellites = request.args.get("include_satellites", "").lower() == "true" if include_satellites or len(normalized) <= 32: - response_payload['satellites'] = get_tracked_satellites() + response_payload["satellites"] = get_tracked_satellites() return jsonify(response_payload) -@satellite_bp.route('/tracked/', methods=['PUT']) +@satellite_bp.route("/tracked/", methods=["PUT"]) def update_tracked_satellite_endpoint(norad_id): """Update the enabled state of a tracked satellite.""" data = request.json or {} - enabled = data.get('enabled') + enabled = data.get("enabled") if enabled is None: - return api_error('Missing enabled field', 400) + return api_error("Missing enabled field", 400) ok = update_tracked_satellite(str(norad_id), bool(enabled)) if ok: - return jsonify({'status': 'success'}) - return api_error('Satellite not found', 404) + return jsonify({"status": "success"}) + return api_error("Satellite not found", 404) -@satellite_bp.route('/tracked/', methods=['DELETE']) +@satellite_bp.route("/tracked/", methods=["DELETE"]) def delete_tracked_satellite_endpoint(norad_id): """Remove a tracked satellite by NORAD ID.""" ok, msg = remove_tracked_satellite(str(norad_id)) if ok: - return jsonify({'status': 'success', 'message': msg}) - status_code = 403 if 'builtin' in msg.lower() else 404 + return jsonify({"status": "success", "message": msg}) + status_code = 403 if "builtin" in msg.lower() else 404 return api_error(msg, status_code) diff --git a/routes/sstv.py b/routes/sstv.py index 2ac73ef..00a3f03 100644 --- a/routes/sstv.py +++ b/routes/sstv.py @@ -16,6 +16,7 @@ from typing import Any from flask import Blueprint, Response, jsonify, request, send_file import app as app_module +from routes.satellite import get_cached_tle from utils.event_pipeline import process_event from utils.logging import get_logger from utils.responses import api_error @@ -26,13 +27,13 @@ from utils.sstv import ( is_sstv_available, ) -logger = get_logger('intercept.sstv') +logger = get_logger("intercept.sstv") -sstv_bp = Blueprint('sstv', __name__, url_prefix='/sstv') +sstv_bp = Blueprint("sstv", __name__, url_prefix="/sstv") # ISS SSTV runs on a fixed downlink; allow a small entry tolerance so users # can type nearby values and still land on the canonical center frequency. -ISS_SSTV_MODULATION = 'fm' +ISS_SSTV_MODULATION = "fm" ISS_SSTV_FREQUENCIES = (ISS_SSTV_FREQ,) ISS_SSTV_FREQ_TOLERANCE_MHZ = 0.05 @@ -59,7 +60,7 @@ _timescale_lock = threading.Lock() # Track which device is being used sstv_active_device: int | None = None -sstv_active_sdr_type: str = 'rtlsdr' +sstv_active_sdr_type: str = "rtlsdr" def _progress_callback(data: dict) -> None: @@ -82,7 +83,7 @@ def _normalize_iss_frequency(frequency_mhz: float) -> float | None: return None -@sstv_bp.route('/status') +@sstv_bp.route("/status") def get_status(): """ Get SSTV decoder status. @@ -94,24 +95,24 @@ def get_status(): decoder = get_sstv_decoder() result = { - 'available': available, - 'decoder': decoder.decoder_available, - 'running': decoder.is_running, - 'iss_frequency': ISS_SSTV_FREQ, - 'modulation': ISS_SSTV_MODULATION, - 'image_count': len(decoder.get_images()), - 'doppler_enabled': decoder.doppler_enabled, + "available": available, + "decoder": decoder.decoder_available, + "running": decoder.is_running, + "iss_frequency": ISS_SSTV_FREQ, + "modulation": ISS_SSTV_MODULATION, + "image_count": len(decoder.get_images()), + "doppler_enabled": decoder.doppler_enabled, } # Include Doppler info if available doppler_info = decoder.last_doppler_info if doppler_info: - result['doppler'] = doppler_info.to_dict() + result["doppler"] = doppler_info.to_dict() return jsonify(result) -@sstv_bp.route('/start', methods=['POST']) +@sstv_bp.route("/start", methods=["POST"]) def start_decoder(): """ Start SSTV decoder. @@ -133,20 +134,24 @@ def start_decoder(): JSON with start status. """ if not is_sstv_available(): - return jsonify({ - 'status': 'error', - 'message': 'SSTV decoder not available. Install numpy and Pillow: pip install numpy Pillow' - }), 400 + return jsonify( + { + "status": "error", + "message": "SSTV decoder not available. Install numpy and Pillow: pip install numpy Pillow", + } + ), 400 decoder = get_sstv_decoder() if decoder.is_running: - return jsonify({ - 'status': 'already_running', - 'frequency': ISS_SSTV_FREQ, - 'modulation': ISS_SSTV_MODULATION, - 'doppler_enabled': decoder.doppler_enabled - }) + return jsonify( + { + "status": "already_running", + "frequency": ISS_SSTV_FREQ, + "modulation": ISS_SSTV_MODULATION, + "doppler_enabled": decoder.doppler_enabled, + } + ) # Clear queue while not _sstv_queue.empty(): @@ -157,43 +162,38 @@ def start_decoder(): # Get parameters data = request.get_json(silent=True) or {} - sdr_type_str = data.get('sdr_type', 'rtlsdr') + sdr_type_str = data.get("sdr_type", "rtlsdr") - if sdr_type_str != 'rtlsdr': - return jsonify({ - 'status': 'error', - 'message': f'{sdr_type_str.replace("_", " ").title()} is not yet supported for this mode. Please use an RTL-SDR device.' - }), 400 + if sdr_type_str != "rtlsdr": + return jsonify( + { + "status": "error", + "message": f"{sdr_type_str.replace('_', ' ').title()} is not yet supported for this mode. Please use an RTL-SDR device.", + } + ), 400 - frequency = data.get('frequency', ISS_SSTV_FREQ) - modulation = str(data.get('modulation', ISS_SSTV_MODULATION)).strip().lower() - device_index = data.get('device', 0) - latitude = data.get('latitude') - longitude = data.get('longitude') + frequency = data.get("frequency", ISS_SSTV_FREQ) + modulation = str(data.get("modulation", ISS_SSTV_MODULATION)).strip().lower() + device_index = data.get("device", 0) + latitude = data.get("latitude") + longitude = data.get("longitude") # Validate modulation (ISS mode is FM-only) if modulation != ISS_SSTV_MODULATION: - return jsonify({ - 'status': 'error', - 'message': f'Modulation must be {ISS_SSTV_MODULATION} for ISS SSTV mode' - }), 400 + return jsonify( + {"status": "error", "message": f"Modulation must be {ISS_SSTV_MODULATION} for ISS SSTV mode"} + ), 400 # Validate frequency try: frequency = float(frequency) normalized_frequency = _normalize_iss_frequency(frequency) if normalized_frequency is None: - supported = ', '.join(f'{freq:.3f}' for freq in ISS_SSTV_FREQUENCIES) - return jsonify({ - 'status': 'error', - 'message': f'Supported ISS SSTV frequency: {supported} MHz FM' - }), 400 + supported = ", ".join(f"{freq:.3f}" for freq in ISS_SSTV_FREQUENCIES) + return jsonify({"status": "error", "message": f"Supported ISS SSTV frequency: {supported} MHz FM"}), 400 frequency = normalized_frequency except (TypeError, ValueError): - return jsonify({ - 'status': 'error', - 'message': 'Invalid frequency' - }), 400 + return jsonify({"status": "error", "message": "Invalid frequency"}), 400 # Validate location if provided if latitude is not None and longitude is not None: @@ -201,20 +201,11 @@ def start_decoder(): latitude = float(latitude) longitude = float(longitude) if not (-90 <= latitude <= 90): - return jsonify({ - 'status': 'error', - 'message': 'Latitude must be between -90 and 90' - }), 400 + return jsonify({"status": "error", "message": "Latitude must be between -90 and 90"}), 400 if not (-180 <= longitude <= 180): - return jsonify({ - 'status': 'error', - 'message': 'Longitude must be between -180 and 180' - }), 400 + return jsonify({"status": "error", "message": "Longitude must be between -180 and 180"}), 400 except (TypeError, ValueError): - return jsonify({ - 'status': 'error', - 'message': 'Invalid latitude or longitude' - }), 400 + return jsonify({"status": "error", "message": "Invalid latitude or longitude"}), 400 else: latitude = None longitude = None @@ -222,13 +213,9 @@ def start_decoder(): # Claim SDR device global sstv_active_device, sstv_active_sdr_type device_int = int(device_index) - error = app_module.claim_sdr_device(device_int, 'sstv', sdr_type_str) + error = app_module.claim_sdr_device(device_int, "sstv", sdr_type_str) if error: - return jsonify({ - 'status': 'error', - 'error_type': 'DEVICE_BUSY', - 'message': error - }), 409 + return jsonify({"status": "error", "error_type": "DEVICE_BUSY", "message": error}), 409 # Set callback and start decoder.set_callback(_progress_callback) @@ -245,28 +232,25 @@ def start_decoder(): sstv_active_sdr_type = sdr_type_str result = { - 'status': 'started', - 'frequency': frequency, - 'modulation': ISS_SSTV_MODULATION, - 'device': device_index, - 'doppler_enabled': decoder.doppler_enabled + "status": "started", + "frequency": frequency, + "modulation": ISS_SSTV_MODULATION, + "device": device_index, + "doppler_enabled": decoder.doppler_enabled, } # Include initial Doppler info if available if decoder.doppler_enabled and decoder.last_doppler_info: - result['doppler'] = decoder.last_doppler_info.to_dict() + result["doppler"] = decoder.last_doppler_info.to_dict() return jsonify(result) else: # Release device on failure app_module.release_sdr_device(device_int, sdr_type_str) - return jsonify({ - 'status': 'error', - 'message': 'Failed to start decoder' - }), 500 + return jsonify({"status": "error", "message": "Failed to start decoder"}), 500 -@sstv_bp.route('/stop', methods=['POST']) +@sstv_bp.route("/stop", methods=["POST"]) def stop_decoder(): """ Stop SSTV decoder. @@ -283,10 +267,10 @@ def stop_decoder(): app_module.release_sdr_device(sstv_active_device, sstv_active_sdr_type) sstv_active_device = None - return jsonify({'status': 'stopped'}) + return jsonify({"status": "stopped"}) -@sstv_bp.route('/doppler') +@sstv_bp.route("/doppler") def get_doppler(): """ Get current Doppler shift information. @@ -299,27 +283,28 @@ def get_doppler(): decoder = get_sstv_decoder() if not decoder.doppler_enabled: - return jsonify({ - 'status': 'disabled', - 'message': 'Doppler tracking not enabled. Provide latitude/longitude when starting decoder.' - }) + return jsonify( + { + "status": "disabled", + "message": "Doppler tracking not enabled. Provide latitude/longitude when starting decoder.", + } + ) doppler_info = decoder.last_doppler_info if not doppler_info: - return jsonify({ - 'status': 'unavailable', - 'message': 'Doppler data not yet available' - }) + return jsonify({"status": "unavailable", "message": "Doppler data not yet available"}) - return jsonify({ - 'status': 'ok', - 'doppler': doppler_info.to_dict(), - 'nominal_frequency_mhz': ISS_SSTV_FREQ, - 'corrected_frequency_mhz': doppler_info.frequency_hz / 1_000_000 - }) + return jsonify( + { + "status": "ok", + "doppler": doppler_info.to_dict(), + "nominal_frequency_mhz": ISS_SSTV_FREQ, + "corrected_frequency_mhz": doppler_info.frequency_hz / 1_000_000, + } + ) -@sstv_bp.route('/images') +@sstv_bp.route("/images") def list_images(): """ Get list of decoded SSTV images. @@ -333,18 +318,14 @@ def list_images(): decoder = get_sstv_decoder() images = decoder.get_images() - limit = request.args.get('limit', type=int) + limit = request.args.get("limit", type=int) if limit and limit > 0: images = images[-limit:] - return jsonify({ - 'status': 'ok', - 'images': [img.to_dict() for img in images], - 'count': len(images) - }) + return jsonify({"status": "ok", "images": [img.to_dict() for img in images], "count": len(images)}) -@sstv_bp.route('/images/') +@sstv_bp.route("/images/") def get_image(filename: str): """ Get a decoded SSTV image file. @@ -358,22 +339,22 @@ def get_image(filename: str): decoder = get_sstv_decoder() # Security: only allow alphanumeric filenames with .png extension - if not filename.replace('_', '').replace('-', '').replace('.', '').isalnum(): - return api_error('Invalid filename', 400) + if not filename.replace("_", "").replace("-", "").replace(".", "").isalnum(): + return api_error("Invalid filename", 400) - if not filename.endswith('.png'): - return api_error('Only PNG files supported', 400) + if not filename.endswith(".png"): + return api_error("Only PNG files supported", 400) # Find image in decoder's output directory image_path = decoder._output_dir / filename if not image_path.exists(): - return api_error('Image not found', 404) + return api_error("Image not found", 404) - return send_file(image_path, mimetype='image/png') + return send_file(image_path, mimetype="image/png") -@sstv_bp.route('/images//download') +@sstv_bp.route("/images//download") def download_image(filename: str): """ Download a decoded SSTV image file. @@ -387,21 +368,21 @@ def download_image(filename: str): decoder = get_sstv_decoder() # Security: only allow alphanumeric filenames with .png extension - if not filename.replace('_', '').replace('-', '').replace('.', '').isalnum(): - return api_error('Invalid filename', 400) + if not filename.replace("_", "").replace("-", "").replace(".", "").isalnum(): + return api_error("Invalid filename", 400) - if not filename.endswith('.png'): - return api_error('Only PNG files supported', 400) + if not filename.endswith(".png"): + return api_error("Only PNG files supported", 400) image_path = decoder._output_dir / filename if not image_path.exists(): - return api_error('Image not found', 404) + return api_error("Image not found", 404) - return send_file(image_path, mimetype='image/png', as_attachment=True, download_name=filename) + return send_file(image_path, mimetype="image/png", as_attachment=True, download_name=filename) -@sstv_bp.route('/images/', methods=['DELETE']) +@sstv_bp.route("/images/", methods=["DELETE"]) def delete_image(filename: str): """ Delete a decoded SSTV image. @@ -415,19 +396,19 @@ def delete_image(filename: str): decoder = get_sstv_decoder() # Security: only allow alphanumeric filenames with .png extension - if not filename.replace('_', '').replace('-', '').replace('.', '').isalnum(): - return api_error('Invalid filename', 400) + if not filename.replace("_", "").replace("-", "").replace(".", "").isalnum(): + return api_error("Invalid filename", 400) - if not filename.endswith('.png'): - return api_error('Only PNG files supported', 400) + if not filename.endswith(".png"): + return api_error("Only PNG files supported", 400) if decoder.delete_image(filename): - return jsonify({'status': 'ok'}) + return jsonify({"status": "ok"}) else: - return api_error('Image not found', 404) + return api_error("Image not found", 404) -@sstv_bp.route('/images', methods=['DELETE']) +@sstv_bp.route("/images", methods=["DELETE"]) def delete_all_images(): """ Delete all decoded SSTV images. @@ -437,10 +418,10 @@ def delete_all_images(): """ decoder = get_sstv_decoder() count = decoder.delete_all_images() - return jsonify({'status': 'ok', 'deleted': count}) + return jsonify({"status": "ok", "deleted": count}) -@sstv_bp.route('/stream') +@sstv_bp.route("/stream") def stream_progress(): """ SSE stream of SSTV decode progress. @@ -453,36 +434,38 @@ def stream_progress(): Returns: SSE stream (text/event-stream) """ + def _on_msg(msg: dict[str, Any]) -> None: - process_event('sstv', msg, msg.get('type')) + process_event("sstv", msg, msg.get("type")) response = Response( sse_stream_fanout( source_queue=_sstv_queue, - channel_key='sstv', + channel_key="sstv", timeout=1.0, keepalive_interval=30.0, on_message=_on_msg, ), - mimetype='text/event-stream', + mimetype="text/event-stream", ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - response.headers['Connection'] = 'keep-alive' + response.headers["Cache-Control"] = "no-cache" + response.headers["X-Accel-Buffering"] = "no" + response.headers["Connection"] = "keep-alive" return response -def _get_timescale(): - """Return a cached skyfield timescale (expensive to create).""" - global _timescale - with _timescale_lock: - if _timescale is None: - from skyfield.api import load - _timescale = load.timescale(builtin=True) - return _timescale +def _get_timescale(): + """Return a cached skyfield timescale (expensive to create).""" + global _timescale + with _timescale_lock: + if _timescale is None: + from skyfield.api import load + + _timescale = load.timescale(builtin=True) + return _timescale -@sstv_bp.route('/iss-schedule') +@sstv_bp.route("/iss-schedule") def iss_schedule(): """ Get ISS pass schedule for SSTV reception. @@ -500,24 +483,23 @@ def iss_schedule(): """ global _iss_schedule_cache, _iss_schedule_cache_time, _iss_schedule_cache_key - lat = request.args.get('latitude', type=float) - lon = request.args.get('longitude', type=float) - hours = request.args.get('hours', 48, type=int) + lat = request.args.get("latitude", type=float) + lon = request.args.get("longitude", type=float) + hours = request.args.get("hours", 48, type=int) if lat is None or lon is None: - return jsonify({ - 'status': 'error', - 'message': 'latitude and longitude parameters required' - }), 400 + return jsonify({"status": "error", "message": "latitude and longitude parameters required"}), 400 # Cache key: rounded lat/lon (1 decimal place) so nearby locations share cache cache_key = f"{round(lat, 1)}:{round(lon, 1)}:{hours}" with _iss_schedule_lock: now = time.time() - if (_iss_schedule_cache is not None - and cache_key == _iss_schedule_cache_key - and (now - _iss_schedule_cache_time) < ISS_SCHEDULE_CACHE_TTL): + if ( + _iss_schedule_cache is not None + and cache_key == _iss_schedule_cache_key + and (now - _iss_schedule_cache_time) < ISS_SCHEDULE_CACHE_TTL + ): return jsonify(_iss_schedule_cache) try: @@ -526,15 +508,10 @@ def iss_schedule(): from skyfield.almanac import find_discrete from skyfield.api import EarthSatellite, wgs84 - from data.satellites import TLE_SATELLITES - - # Get ISS TLE - iss_tle = TLE_SATELLITES.get('ISS') + # Get ISS TLE from live cache (kept fresh by auto-refresh) + iss_tle = get_cached_tle("ISS") if not iss_tle: - return jsonify({ - 'status': 'error', - 'message': 'ISS TLE data not available' - }), 500 + return jsonify({"status": "error", "message": "ISS TLE data not available"}), 500 ts = _get_timescale() satellite = EarthSatellite(iss_tle[1], iss_tle[2], iss_tle[0], ts) @@ -549,7 +526,7 @@ def iss_schedule(): alt, _, _ = topocentric.altaz() return alt.degrees > 0 - above_horizon.step_days = 1/720 + above_horizon.step_days = 1 / 720 times, events = find_discrete(t0, t1, above_horizon) @@ -588,23 +565,25 @@ def iss_schedule(): max_el = alt.degrees if max_el >= 10: # Min elevation filter - passes.append({ - 'satellite': 'ISS', - 'startTime': rise_time.utc_datetime().strftime('%Y-%m-%d %H:%M UTC'), - 'startTimeISO': rise_time.utc_datetime().isoformat(), - 'maxEl': round(max_el, 1), - 'duration': duration_minutes, - 'color': '#00ffff' - }) + passes.append( + { + "satellite": "ISS", + "startTime": rise_time.utc_datetime().strftime("%Y-%m-%d %H:%M UTC"), + "startTimeISO": rise_time.utc_datetime().isoformat(), + "maxEl": round(max_el, 1), + "duration": duration_minutes, + "color": "#00ffff", + } + ) i += 1 result = { - 'status': 'ok', - 'passes': passes, - 'count': len(passes), - 'sstv_frequency': ISS_SSTV_FREQ, - 'note': 'ISS SSTV events are not continuous. Check ARISS.org for scheduled events.' + "status": "ok", + "passes": passes, + "count": len(passes), + "sstv_frequency": ISS_SSTV_FREQ, + "note": "ISS SSTV events are not continuous. Check ARISS.org for scheduled events.", } # Update cache @@ -616,17 +595,11 @@ def iss_schedule(): return jsonify(result) except ImportError: - return jsonify({ - 'status': 'error', - 'message': 'skyfield library not installed' - }), 503 + return jsonify({"status": "error", "message": "skyfield library not installed"}), 503 except Exception as e: logger.error(f"Error getting ISS schedule: {e}") - return jsonify({ - 'status': 'error', - 'message': str(e) - }), 500 + return jsonify({"status": "error", "message": str(e)}), 500 def _fetch_iss_position() -> dict | None: @@ -644,14 +617,14 @@ def _fetch_iss_position() -> dict | None: # Try primary API: Where The ISS At try: - response = requests.get('https://api.wheretheiss.at/v1/satellites/25544', timeout=3) + response = requests.get("https://api.wheretheiss.at/v1/satellites/25544", timeout=3) if response.status_code == 200: data = response.json() cached = { - 'lat': float(data['latitude']), - 'lon': float(data['longitude']), - 'altitude': float(data.get('altitude', 420)), - 'source': 'wheretheiss', + "lat": float(data["latitude"]), + "lon": float(data["longitude"]), + "altitude": float(data.get("altitude", 420)), + "source": "wheretheiss", } except Exception as e: logger.warning(f"Where The ISS At API failed: {e}") @@ -659,15 +632,15 @@ def _fetch_iss_position() -> dict | None: # Try fallback API: Open Notify if cached is None: try: - response = requests.get('http://api.open-notify.org/iss-now.json', timeout=3) + response = requests.get("http://api.open-notify.org/iss-now.json", timeout=3) if response.status_code == 200: data = response.json() - if data.get('message') == 'success': + if data.get("message") == "success": cached = { - 'lat': float(data['iss_position']['latitude']), - 'lon': float(data['iss_position']['longitude']), - 'altitude': 420, - 'source': 'open-notify', + "lat": float(data["iss_position"]["latitude"]), + "lon": float(data["iss_position"]["longitude"]), + "altitude": 420, + "source": "open-notify", } except Exception as e: logger.warning(f"Open Notify API failed: {e}") @@ -680,7 +653,7 @@ def _fetch_iss_position() -> dict | None: return cached -@sstv_bp.route('/iss-position') +@sstv_bp.route("/iss-position") def iss_position(): """ Get current ISS position from real-time API. @@ -698,28 +671,25 @@ def iss_position(): """ from datetime import datetime - observer_lat = request.args.get('latitude', type=float) - observer_lon = request.args.get('longitude', type=float) + observer_lat = request.args.get("latitude", type=float) + observer_lon = request.args.get("longitude", type=float) pos = _fetch_iss_position() if pos is None: - return jsonify({ - 'status': 'error', - 'message': 'Unable to fetch ISS position from real-time APIs' - }), 503 + return jsonify({"status": "error", "message": "Unable to fetch ISS position from real-time APIs"}), 503 result = { - 'status': 'ok', - 'lat': pos['lat'], - 'lon': pos['lon'], - 'altitude': pos['altitude'], - 'timestamp': datetime.utcnow().isoformat(), - 'source': pos['source'], + "status": "ok", + "lat": pos["lat"], + "lon": pos["lon"], + "altitude": pos["altitude"], + "timestamp": datetime.utcnow().isoformat(), + "source": pos["source"], } # Calculate observer-relative data if location provided if observer_lat is not None and observer_lon is not None: - result.update(_calculate_observer_data(pos['lat'], pos['lon'], observer_lat, observer_lon)) + result.update(_calculate_observer_data(pos["lat"], pos["lon"], observer_lat, observer_lon)) return jsonify(result) @@ -743,7 +713,7 @@ def _calculate_observer_data(iss_lat: float, iss_lon: float, obs_lat: float, obs # Haversine for ground distance dlat = lat2 - lat1 dlon = lon2 - lon1 - a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 + a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 c = 2 * math.asin(math.sqrt(a)) ground_distance = earth_radius * c @@ -763,14 +733,60 @@ def _calculate_observer_data(iss_lat: float, iss_lon: float, obs_lat: float, obs azimuth = math.degrees(math.atan2(y, x)) azimuth = (azimuth + 360) % 360 - return { - 'elevation': round(elevation, 1), - 'azimuth': round(azimuth, 1), - 'distance': round(slant_range, 1) - } + return {"elevation": round(elevation, 1), "azimuth": round(azimuth, 1), "distance": round(slant_range, 1)} -@sstv_bp.route('/decode-file', methods=['POST']) +@sstv_bp.route("/iss-track") +def iss_track(): + """ + Return ISS ground track points propagated from TLE data. + + Uses skyfield SGP4 propagation over ±90 minutes (roughly one full orbit) + to produce an accurate track that accounts for Earth's rotation. + + Returns: + JSON with list of {lat, lon, past} points. + """ + try: + from datetime import timedelta + + from skyfield.api import EarthSatellite, wgs84 + + iss_tle = get_cached_tle("ISS") + if not iss_tle: + return jsonify({"status": "error", "message": "ISS TLE not available"}), 500 + + ts = _get_timescale() + satellite = EarthSatellite(iss_tle[1], iss_tle[2], iss_tle[0], ts) + now = ts.now() + now_dt = now.utc_datetime() + + track = [] + for minutes_offset in range(-90, 91, 1): + t_point = ts.utc(now_dt + timedelta(minutes=minutes_offset)) + try: + geo = satellite.at(t_point) + sp = wgs84.subpoint(geo) + track.append( + { + "lat": round(float(sp.latitude.degrees), 4), + "lon": round(float(sp.longitude.degrees), 4), + "past": minutes_offset < 0, + } + ) + except Exception: + continue + + return jsonify({"status": "ok", "track": track}) + + except ImportError: + return jsonify({"status": "error", "message": "skyfield not installed"}), 503 + except Exception as e: + logger.error(f"Error computing ISS track: {e}") + return jsonify({"status": "error", "message": str(e)}), 500 + + +@sstv_bp.route("/decode-file", methods=["POST"]) def decode_file(): """ Decode SSTV from an uploaded audio file. @@ -780,23 +796,18 @@ def decode_file(): Returns: JSON with decoded images. """ - if 'audio' not in request.files: - return jsonify({ - 'status': 'error', - 'message': 'No audio file provided' - }), 400 + if "audio" not in request.files: + return jsonify({"status": "error", "message": "No audio file provided"}), 400 - audio_file = request.files['audio'] + audio_file = request.files["audio"] if not audio_file.filename: - return jsonify({ - 'status': 'error', - 'message': 'No file selected' - }), 400 + return jsonify({"status": "error", "message": "No file selected"}), 400 # Save to temp file import tempfile - with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: + + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: audio_file.save(tmp.name) tmp_path = tmp.name @@ -804,18 +815,11 @@ def decode_file(): decoder = get_sstv_decoder() images = decoder.decode_file(tmp_path) - return jsonify({ - 'status': 'ok', - 'images': [img.to_dict() for img in images], - 'count': len(images) - }) + return jsonify({"status": "ok", "images": [img.to_dict() for img in images], "count": len(images)}) except Exception as e: logger.error(f"Error decoding file: {e}") - return jsonify({ - 'status': 'error', - 'message': str(e) - }), 500 + return jsonify({"status": "error", "message": str(e)}), 500 finally: # Clean up temp file diff --git a/static/js/modes/sstv.js b/static/js/modes/sstv.js index c7b5d25..c70e76e 100644 --- a/static/js/modes/sstv.js +++ b/static/js/modes/sstv.js @@ -13,12 +13,14 @@ const SSTV = (function() { let issMap = null; let issMarker = null; let issTrackLine = null; + let issTrackPast = null; let issPosition = null; - let issUpdateInterval = null; - let countdownInterval = null; - let nextPassData = null; - let pendingMapInvalidate = false; - let locationListenersAttached = false; + let issUpdateInterval = null; + let issTrackInterval = null; + let countdownInterval = null; + let nextPassData = null; + let pendingMapInvalidate = false; + let locationListenersAttached = false; // ISS frequency const ISS_FREQ = 145.800; @@ -93,12 +95,12 @@ const SSTV = (function() { if (latInput && storedLat) latInput.value = storedLat; if (lonInput && storedLon) lonInput.value = storedLon; - if (!locationListenersAttached) { - if (latInput) latInput.addEventListener('change', saveLocationFromInputs); - if (lonInput) lonInput.addEventListener('change', saveLocationFromInputs); - locationListenersAttached = true; - } - } + if (!locationListenersAttached) { + if (latInput) latInput.addEventListener('change', saveLocationFromInputs); + if (lonInput) lonInput.addEventListener('change', saveLocationFromInputs); + locationListenersAttached = true; + } + } /** * Save location from input fields @@ -250,12 +252,19 @@ const SSTV = (function() { // Create ISS marker (will be positioned when we get data) issMarker = L.marker([0, 0], { icon: issIcon }).addTo(issMap); - // Create ground track line + // Past track (dimmer, solid) + issTrackPast = L.polyline([], { + color: '#00d4ff', + weight: 1.5, + opacity: 0.3, + }).addTo(issMap); + + // Future track (brighter, dashed) issTrackLine = L.polyline([], { color: '#00d4ff', weight: 2, - opacity: 0.6, - dashArray: '5, 5' + opacity: 0.7, + dashArray: '6, 4' }).addTo(issMap); issMap.on('resize moveend zoomend', () => { @@ -272,9 +281,12 @@ const SSTV = (function() { */ function startIssTracking() { updateIssPosition(); - // Update every 5 seconds + updateIssTrack(); if (issUpdateInterval) clearInterval(issUpdateInterval); issUpdateInterval = setInterval(updateIssPosition, 5000); + // Track refreshes every 5 minutes — one orbit is ~93 min so this keeps it current + if (issTrackInterval) clearInterval(issTrackInterval); + issTrackInterval = setInterval(updateIssTrack, 5 * 60 * 1000); } /** @@ -285,6 +297,52 @@ const SSTV = (function() { clearInterval(issUpdateInterval); issUpdateInterval = null; } + if (issTrackInterval) { + clearInterval(issTrackInterval); + issTrackInterval = null; + } + } + + /** + * Fetch and render the ISS ground track from the backend (TLE-propagated). + */ + async function updateIssTrack() { + try { + const response = await fetch('/sstv/iss-track'); + const data = await response.json(); + if (data.status !== 'ok' || !issTrackLine || !issTrackPast) return; + + const pastPts = [], futurePts = []; + for (const pt of data.track) { + (pt.past ? pastPts : futurePts).push([pt.lat, pt.lon]); + } + + // Split future track at antimeridian crossings to avoid long horizontal lines + const futureSegments = _splitAtAntimeridian(futurePts); + const pastSegments = _splitAtAntimeridian(pastPts); + + issTrackLine.setLatLngs(futureSegments); + issTrackPast.setLatLngs(pastSegments); + } catch (err) { + console.error('Failed to fetch ISS track:', err); + } + } + + /** + * Split an array of [lat, lon] points into segments at antimeridian crossings. + */ + function _splitAtAntimeridian(points) { + const segments = []; + let current = []; + for (let i = 0; i < points.length; i++) { + if (i > 0 && Math.abs(points[i][1] - points[i - 1][1]) > 180) { + if (current.length > 1) segments.push(current); + current = []; + } + current.push(points[i]); + } + if (current.length > 1) segments.push(current); + return segments; } /** @@ -486,55 +544,7 @@ const SSTV = (function() { issMarker.setLatLng([lat, lon]); } - // Calculate and draw ground track - if (issTrackLine) { - const trackPoints = []; - const inclination = 51.6; // ISS orbital inclination in degrees - - // Generate orbit track points - for (let offset = -180; offset <= 180; offset += 3) { - let trackLon = lon + offset; - - // Normalize longitude - while (trackLon > 180) trackLon -= 360; - while (trackLon < -180) trackLon += 360; - - // Calculate latitude based on orbital inclination - const phase = (offset / 360) * 2 * Math.PI; - const currentPhase = Math.asin(Math.max(-1, Math.min(1, lat / inclination))); - let trackLat = inclination * Math.sin(phase + currentPhase); - - // Clamp to valid range - trackLat = Math.max(-inclination, Math.min(inclination, trackLat)); - - trackPoints.push([trackLat, trackLon]); - } - - // Split track at antimeridian to avoid line across map - const segments = []; - let currentSegment = []; - - for (let i = 0; i < trackPoints.length; i++) { - if (i > 0) { - const prevLon = trackPoints[i - 1][1]; - const currLon = trackPoints[i][1]; - if (Math.abs(currLon - prevLon) > 180) { - // Crossed antimeridian - if (currentSegment.length > 0) { - segments.push(currentSegment); - } - currentSegment = []; - } - } - currentSegment.push(trackPoints[i]); - } - if (currentSegment.length > 0) { - segments.push(currentSegment); - } - - // Use only the longest segment or combine if needed - issTrackLine.setLatLngs(segments.length > 0 ? segments : []); - } + // Track is fetched separately by updateIssTrack() via /sstv/iss-track // Pan map to follow ISS only when the map pane is currently renderable. if (isMapContainerVisible()) {