Fix setup.sh hanging on Python 3.14/macOS and add satellite enhancements

- Add --no-cache-dir and --timeout 120 to all pip calls to prevent hanging
  on corrupt/stale pip HTTP cache (cachecontrol .pyc issue)
- Replace silent python -c import verification with pip show to avoid
  import-time side effects hanging the installer
- Switch optional packages to --only-binary :all: to skip source compilation
  on Python versions without pre-built wheels (prevents gevent/numpy hangs)
- Warn early when Python 3.13+ is detected that some packages may be skipped
- Add ground track caching with 30-minute TTL to satellite route
- Add live satellite position tracker background thread via SSE fanout
- Add satellite_predict, satellite_telemetry, and satnogs utilities

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-03-18 11:09:00 +00:00
parent 3140f54419
commit dc84e933c1
9 changed files with 1497 additions and 440 deletions
+210
View File
@@ -0,0 +1,210 @@
"""Shared satellite pass prediction utility.
Used by both the satellite tracking dashboard and the weather satellite scheduler.
Uses Skyfield's find_events() for accurate AOS/TCA/LOS event detection.
"""
from __future__ import annotations
import datetime
import math
from typing import Any
from utils.logging import get_logger
logger = get_logger('intercept.satellite_predict')
def predict_passes(
tle_data: tuple,
observer, # skyfield wgs84.latlon object
ts, # skyfield timescale
t0, # skyfield Time start
t1, # skyfield Time end
min_el: float = 10.0,
include_trajectory: bool = True,
include_ground_track: bool = True,
) -> list[dict[str, Any]]:
"""Predict satellite passes over an observer location.
Args:
tle_data: (name, line1, line2) tuple
observer: Skyfield wgs84.latlon observer
ts: Skyfield timescale
t0: Start time (Skyfield Time)
t1: End time (Skyfield Time)
min_el: Minimum peak elevation in degrees to include pass
include_trajectory: Include 30-point az/el trajectory for polar plot
include_ground_track: Include 60-point lat/lon ground track for map
Returns:
List of pass dicts sorted by AOS time. Each dict contains:
aosTime, aosAz, aosEl,
tcaTime, tcaEl, tcaAz,
losTime, losAz, losEl,
duration (minutes, float),
startTime (human-readable UTC),
startTimeISO (ISO string),
endTimeISO (ISO string),
maxEl (float, same as tcaEl),
trajectory (list of {az, el} if include_trajectory),
groundTrack (list of {lat, lon} if include_ground_track)
"""
from skyfield.api import EarthSatellite, wgs84
# Filter decaying satellites by checking ndot from TLE line1 chars 33-43
try:
line1 = tle_data[1]
ndot_str = line1[33:43].strip()
ndot = float(ndot_str)
if abs(ndot) > 0.01:
logger.debug(
'Skipping decaying satellite %s (ndot=%s)', tle_data[0], ndot
)
return []
except (ValueError, IndexError):
# Don't skip on parse error
pass
# Create EarthSatellite object
try:
satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], ts)
except Exception as exc:
logger.debug('Failed to create EarthSatellite for %s: %s', tle_data[0], exc)
return []
# Find events using Skyfield's native find_events()
# Event types: 0=AOS, 1=TCA, 2=LOS
try:
times, events = satellite.find_events(
observer, t0, t1, altitude_degrees=min_el
)
except Exception as exc:
logger.debug('find_events failed for %s: %s', tle_data[0], exc)
return []
# Group events into AOS->TCA->LOS triplets
passes = []
i = 0
total = len(events)
# Skip any leading non-AOS events (satellite already above horizon at t0)
while i < total and events[i] != 0:
i += 1
while i < total:
# Expect AOS (0)
if events[i] != 0:
i += 1
continue
aos_time = times[i]
i += 1
# Collect TCA and LOS, watching for premature next AOS
tca_time = None
los_time = None
while i < total and events[i] != 0:
if events[i] == 1:
tca_time = times[i]
elif events[i] == 2:
los_time = times[i]
i += 1
# Must have both AOS and LOS to form a valid pass
if los_time is None:
# Incomplete pass — skip
continue
# If TCA is missing, derive from midpoint between AOS and LOS
if tca_time is None:
aos_tt = aos_time.tt
los_tt = los_time.tt
tca_time = ts.tt_jd((aos_tt + los_tt) / 2.0)
# Compute topocentric positions at AOS, TCA, LOS
try:
aos_topo = (satellite - observer).at(aos_time)
tca_topo = (satellite - observer).at(tca_time)
los_topo = (satellite - observer).at(los_time)
aos_alt, aos_az, _ = aos_topo.altaz()
tca_alt, tca_az, _ = tca_topo.altaz()
los_alt, los_az, _ = los_topo.altaz()
aos_dt = aos_time.utc_datetime()
tca_dt = tca_time.utc_datetime()
los_dt = los_time.utc_datetime()
duration = (los_dt - aos_dt).total_seconds() / 60.0
pass_dict: dict[str, Any] = {
'aosTime': aos_dt.isoformat(),
'aosAz': round(float(aos_az.degrees), 1),
'aosEl': round(float(aos_alt.degrees), 1),
'tcaTime': tca_dt.isoformat(),
'tcaAz': round(float(tca_az.degrees), 1),
'tcaEl': round(float(tca_alt.degrees), 1),
'losTime': los_dt.isoformat(),
'losAz': round(float(los_az.degrees), 1),
'losEl': round(float(los_alt.degrees), 1),
'duration': round(duration, 1),
# Backwards-compatible fields
'startTime': aos_dt.strftime('%Y-%m-%d %H:%M UTC'),
'startTimeISO': aos_dt.isoformat(),
'endTimeISO': los_dt.isoformat(),
'maxEl': round(float(tca_alt.degrees), 1),
}
# Build 30-point az/el trajectory for polar plot
if include_trajectory:
trajectory = []
for step in range(30):
frac = step / 29.0
t_pt = ts.tt_jd(
aos_time.tt + frac * (los_time.tt - aos_time.tt)
)
try:
pt_alt, pt_az, _ = (satellite - observer).at(t_pt).altaz()
trajectory.append({
'az': round(float(pt_az.degrees), 1),
'el': round(float(max(0.0, pt_alt.degrees)), 1),
})
except Exception as pt_exc:
logger.debug(
'Trajectory point error for %s: %s', tle_data[0], pt_exc
)
pass_dict['trajectory'] = trajectory
# Build 60-point lat/lon ground track for map
if include_ground_track:
ground_track = []
for step in range(60):
frac = step / 59.0
t_pt = ts.tt_jd(
aos_time.tt + frac * (los_time.tt - aos_time.tt)
)
try:
geocentric = satellite.at(t_pt)
subpoint = wgs84.subpoint(geocentric)
ground_track.append({
'lat': round(float(subpoint.latitude.degrees), 4),
'lon': round(float(subpoint.longitude.degrees), 4),
})
except Exception as gt_exc:
logger.debug(
'Ground track point error for %s: %s', tle_data[0], gt_exc
)
pass_dict['groundTrack'] = ground_track
passes.append(pass_dict)
except Exception as exc:
logger.debug(
'Failed to compute pass details for %s: %s', tle_data[0], exc
)
continue
passes.sort(key=lambda p: p['startTimeISO'])
return passes
+436
View File
@@ -0,0 +1,436 @@
"""Satellite telemetry packet parsers.
Provides pure-Python decoders for common amateur/CubeSat protocols:
- AX.25 (callsign-addressed frames)
- CSP (CubeSat Space Protocol)
- CCSDS TM (space packet primary header)
Also provides a PayloadAnalyzer that generates multi-interpretation
views of raw binary data (hex dump, float32, uint16/32, strings).
"""
from __future__ import annotations
import math
import struct
import string
from datetime import datetime
# ---------------------------------------------------------------------------
# AX.25 parser
# ---------------------------------------------------------------------------
def _decode_ax25_callsign(addr_bytes: bytes) -> str:
"""Decode a 7-byte AX.25 address field into a 'CALL-SSID' string.
The first 6 bytes encode the callsign (each ASCII character left-shifted
by 1 bit). The 7th byte encodes the SSID in bits 4-1.
Args:
addr_bytes: Exactly 7 bytes of raw address data.
Returns:
A callsign string such as ``"N0CALL-3"`` or ``"N0CALL"`` (no suffix
when SSID is 0).
"""
callsign = "".join(chr(b >> 1) for b in addr_bytes[:6]).rstrip()
ssid = (addr_bytes[6] >> 1) & 0x0F
return f"{callsign}-{ssid}" if ssid else callsign
def parse_ax25(data: bytes) -> dict | None:
"""Parse an AX.25 frame from raw bytes.
Decodes destination and source callsigns, optional repeater addresses,
control byte, optional PID byte, and payload.
Args:
data: Raw bytes of the AX.25 frame (without HDLC flags or FCS).
Returns:
A dict with parsed fields or ``None`` if the frame is too short or
cannot be decoded.
"""
try:
# Minimum: 7 (dest) + 7 (src) + 1 (control) = 15 bytes
if len(data) < 15:
return None
destination = _decode_ax25_callsign(data[0:7])
source = _decode_ax25_callsign(data[7:14])
# Walk repeater addresses. The H-bit (LSB of byte 6 in each address)
# being set means this is the last address in the chain.
offset = 14 # byte index of the last byte in the source field
repeaters: list[str] = []
if not (data[offset] & 0x01):
# More addresses follow; read up to 8 repeaters.
for _ in range(8):
rep_start = offset + 1
rep_end = rep_start + 7
if rep_end > len(data):
break
repeaters.append(_decode_ax25_callsign(data[rep_start:rep_end]))
offset = rep_end - 1 # last byte of this repeater field
if data[offset] & 0x01:
# H-bit set — this was the final address
break
# Control byte follows the last address field
ctrl_offset = offset + 1
if ctrl_offset >= len(data):
return None
control = data[ctrl_offset]
payload_offset = ctrl_offset + 1
# PID byte is present for I-frames (bits 0-1 == 0b00) and
# UI-frames (bits 0-5 == 0b000011). More generally: absent only
# for pure unnumbered frames where (control & 0x03) == 0x03 AND
# control is not 0x03 itself (UI).
pid: int | None = None
is_unnumbered = (control & 0x03) == 0x03
is_ui = control == 0x03
if not is_unnumbered or is_ui:
if payload_offset < len(data):
pid = data[payload_offset]
payload_offset += 1
payload = data[payload_offset:]
return {
"protocol": "AX.25",
"destination": destination,
"source": source,
"repeaters": repeaters,
"control": control,
"pid": pid,
"payload": payload,
"payload_hex": payload.hex(),
"payload_length": len(payload),
}
except Exception: # noqa: BLE001
return None
# ---------------------------------------------------------------------------
# CSP parser
# ---------------------------------------------------------------------------
def parse_csp(data: bytes) -> dict | None:
"""Parse a CSP v1 (CubeSat Space Protocol) header.
The first 4 bytes form a big-endian 32-bit header word with the
following bit layout::
bits 31-27 priority (5 bits)
bits 26-22 source (5 bits)
bits 21-17 destination (5 bits)
bits 16-12 dest_port (5 bits)
bits 11-6 src_port (6 bits)
bits 5-0 flags (6 bits)
Args:
data: Raw bytes starting from the CSP header.
Returns:
A dict with parsed CSP fields and payload, or ``None`` on failure.
"""
try:
if len(data) < 4:
return None
header: int = struct.unpack(">I", data[:4])[0]
priority = (header >> 27) & 0x1F
source = (header >> 22) & 0x1F
destination = (header >> 17) & 0x1F
dest_port = (header >> 12) & 0x1F
src_port = (header >> 6) & 0x3F
raw_flags = header & 0x3F
flags = {
"frag": bool(raw_flags & 0x10),
"hmac": bool(raw_flags & 0x08),
"xtea": bool(raw_flags & 0x04),
"rdp": bool(raw_flags & 0x02),
"crc": bool(raw_flags & 0x01),
}
payload = data[4:]
return {
"protocol": "CSP",
"priority": priority,
"source": source,
"destination": destination,
"dest_port": dest_port,
"src_port": src_port,
"flags": flags,
"payload": payload,
"payload_hex": payload.hex(),
"payload_length": len(payload),
}
except Exception: # noqa: BLE001
return None
# ---------------------------------------------------------------------------
# CCSDS parser
# ---------------------------------------------------------------------------
def parse_ccsds(data: bytes) -> dict | None:
"""Parse a CCSDS Space Packet primary header (6 bytes).
Header layout::
bytes 0-1: version (3 bits) | packet_type (1 bit) |
secondary_header_flag (1 bit) | APID (11 bits)
bytes 2-3: sequence_flags (2 bits) | sequence_count (14 bits)
bytes 4-5: data_length field (16 bits, = actual_payload_length - 1)
Args:
data: Raw bytes starting from the CCSDS primary header.
Returns:
A dict with parsed CCSDS fields and payload, or ``None`` on failure.
"""
try:
if len(data) < 6:
return None
word0: int = struct.unpack(">H", data[0:2])[0]
word1: int = struct.unpack(">H", data[2:4])[0]
word2: int = struct.unpack(">H", data[4:6])[0]
version = (word0 >> 13) & 0x07
packet_type = (word0 >> 12) & 0x01
secondary_header_flag = bool((word0 >> 11) & 0x01)
apid = word0 & 0x07FF
sequence_flags = (word1 >> 14) & 0x03
sequence_count = word1 & 0x3FFF
data_length = word2 # raw field; actual user data bytes = data_length + 1
payload = data[6:]
return {
"protocol": "CCSDS_TM",
"version": version,
"packet_type": packet_type,
"secondary_header": secondary_header_flag,
"apid": apid,
"sequence_flags": sequence_flags,
"sequence_count": sequence_count,
"data_length": data_length,
"payload": payload,
"payload_hex": payload.hex(),
"payload_length": len(payload),
}
except Exception: # noqa: BLE001
return None
# ---------------------------------------------------------------------------
# Payload analyzer
# ---------------------------------------------------------------------------
_PRINTABLE = set(string.printable) - set("\t\n\r\x0b\x0c")
def _hex_dump(data: bytes) -> str:
"""Format bytes as an annotated hex dump, 16 bytes per line.
Each line is formatted as::
OOOO: XX XX XX XX XX XX XX XX XX XX XX XX XX XX XX XX ASCII
where ``OOOO`` is the hex offset and ``ASCII`` shows printable characters
(non-printable replaced with ``'.'``).
Args:
data: Bytes to format.
Returns:
Multi-line hex dump string (trailing newline on each line).
"""
lines: list[str] = []
for row in range(0, len(data), 16):
chunk = data[row : row + 16]
# Build groups of 4 bytes separated by two spaces
groups: list[str] = []
for g in range(0, 16, 4):
group_bytes = chunk[g : g + 4]
groups.append(" ".join(f"{b:02X}" for b in group_bytes))
hex_part = " ".join(groups)
# Pad to fixed width: 16 bytes × 3 chars - 1 space + 3 group separators
# Maximum width: 11+2+11+2+11+2+11 = 50 chars; pad to 50
hex_part = hex_part.ljust(50)
ascii_part = "".join(chr(b) if chr(b) in _PRINTABLE else "." for b in chunk)
lines.append(f"{row:04X}: {hex_part} {ascii_part}\n")
return "".join(lines)
def _extract_strings(data: bytes, min_len: int = 3) -> list[str]:
"""Extract runs of printable ASCII characters of at least ``min_len``."""
results: list[str] = []
current: list[str] = []
for b in data:
ch = chr(b)
if ch in _PRINTABLE:
current.append(ch)
else:
if len(current) >= min_len:
results.append("".join(current))
current = []
if len(current) >= min_len:
results.append("".join(current))
return results
def analyze_payload(data: bytes) -> dict:
"""Generate a multi-interpretation analysis of raw bytes.
Produces a hex dump, several numeric/string interpretations, and a
list of heuristic observations about plausible sensor values.
Args:
data: Raw bytes to analyze.
Returns:
A dict containing ``hex_dump``, ``length``, ``interpretations``,
and ``heuristics`` keys. Never raises an exception.
"""
try:
hex_dump = _hex_dump(data)
length = len(data)
# --- float32 (little-endian) ---
float32_values: list[float] = []
for i in range(0, length - 3, 4):
(val,) = struct.unpack_from("<f", data, i)
if not math.isnan(val) and abs(val) <= 1e9:
float32_values.append(val)
# --- uint16 little-endian ---
uint16_values: list[int] = []
for i in range(0, length - 1, 2):
(val,) = struct.unpack_from("<H", data, i)
uint16_values.append(val)
# --- uint32 little-endian ---
uint32_values: list[int] = []
for i in range(0, length - 3, 4):
(val,) = struct.unpack_from("<I", data, i)
uint32_values.append(val)
# --- printable string runs ---
strings = _extract_strings(data, min_len=3)
interpretations = {
"float32": float32_values,
"uint16_le": uint16_values,
"uint32_le": uint32_values,
"strings": strings,
}
# --- heuristics ---
heuristics: list[str] = []
used_as_voltage: set[int] = set()
for idx, v in enumerate(float32_values):
# Voltage: small positive float
if 0.0 < v < 10.0:
heuristics.append(f"Possible voltage: {v:.3f} V (index {idx})")
used_as_voltage.add(idx)
for idx, v in enumerate(float32_values):
# Temperature: plausible range, not already flagged as voltage, not zero
if -50.0 < v < 120.0 and idx not in used_as_voltage and v != 0.0:
heuristics.append(f"Possible temperature: {v:.1f}°C (index {idx})")
for idx, v in enumerate(float32_values):
# Current: small positive float not already flagged as voltage
if 0.0 < v < 5.0 and idx not in used_as_voltage:
heuristics.append(f"Possible current: {v:.3f} A (index {idx})")
for idx, v in enumerate(float32_values):
# Unix timestamp: plausible range (roughly 20012033)
if 1_000_000_000.0 < v < 2_000_000_000.0:
ts = datetime.utcfromtimestamp(v)
heuristics.append(f"Possible Unix timestamp: {ts} (index {idx})")
return {
"hex_dump": hex_dump,
"length": length,
"interpretations": interpretations,
"heuristics": heuristics,
}
except Exception: # noqa: BLE001
# Guarantee a safe return even on completely malformed input
return {
"hex_dump": "",
"length": len(data) if isinstance(data, (bytes, bytearray)) else 0,
"interpretations": {"float32": [], "uint16_le": [], "uint32_le": [], "strings": []},
"heuristics": [],
}
# ---------------------------------------------------------------------------
# Auto-parser
# ---------------------------------------------------------------------------
def auto_parse(data: bytes) -> dict:
"""Attempt to decode a packet using each supported protocol in turn.
Tries parsers in priority order: CSP → CCSDS → AX.25. Returns the
first successful parse merged with a ``payload_analysis`` key produced
by :func:`analyze_payload`.
Args:
data: Raw bytes of the packet.
Returns:
A dict with parsed protocol fields plus ``payload_analysis``, or a
fallback dict with ``protocol: 'unknown'`` and a top-level
``analysis`` key if no parser succeeds.
"""
# CSP: 4-byte header minimum
if len(data) >= 4:
result = parse_csp(data)
if result is not None:
result["payload_analysis"] = analyze_payload(result["payload"])
return result
# CCSDS: 6-byte header minimum
if len(data) >= 6:
result = parse_ccsds(data)
if result is not None:
result["payload_analysis"] = analyze_payload(result["payload"])
return result
# AX.25: 15-byte frame minimum
if len(data) >= 15:
result = parse_ax25(data)
if result is not None:
result["payload_analysis"] = analyze_payload(result["payload"])
return result
# Nothing matched — return a raw analysis
return {
"protocol": "unknown",
"raw_hex": data.hex(),
"analysis": analyze_payload(data),
}
+145
View File
@@ -0,0 +1,145 @@
"""SatNOGS transmitter data.
Fetches downlink/uplink frequency data from the SatNOGS database,
keyed by NORAD ID. Cached for 24 hours to avoid hammering the API.
"""
from __future__ import annotations
import json
import threading
import time
import urllib.request
from utils.logging import get_logger
logger = get_logger("intercept.satnogs")
# ---------------------------------------------------------------------------
# Module-level cache
# ---------------------------------------------------------------------------
_transmitters: dict[int, list[dict]] = {}
_fetched_at: float = 0.0
_CACHE_TTL = 86400 # 24 hours in seconds
_fetch_lock = threading.Lock()
_SATNOGS_URL = "https://db.satnogs.org/api/transmitters/?format=json"
_REQUEST_TIMEOUT = 15 # seconds
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _hz_to_mhz(value: float | int | None) -> float | None:
"""Convert a frequency in Hz to MHz, returning None if value is None."""
if value is None:
return None
return float(value) / 1_000_000.0
def _safe_float(value: object) -> float | None:
"""Return a float or None, silently swallowing conversion errors."""
if value is None:
return None
try:
return float(value) # type: ignore[arg-type]
except (TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def fetch_transmitters() -> dict[int, list[dict]]:
"""Fetch transmitter records from the SatNOGS database API.
Makes a single HTTP GET to the SatNOGS transmitters endpoint, groups
results by NORAD catalogue ID, and converts all frequency fields from
Hz to MHz.
Returns:
A dict mapping NORAD ID (int) to a list of transmitter dicts.
Returns an empty dict on any network or parse error.
"""
try:
logger.info("Fetching SatNOGS transmitter data from %s", _SATNOGS_URL)
with urllib.request.urlopen(_SATNOGS_URL, timeout=_REQUEST_TIMEOUT) as resp:
raw = resp.read()
records: list[dict] = json.loads(raw)
grouped: dict[int, list[dict]] = {}
for item in records:
norad_id = item.get("norad_cat_id")
if norad_id is None:
continue
norad_id = int(norad_id)
entry: dict = {
"description": str(item.get("description") or ""),
"downlink_low": _hz_to_mhz(_safe_float(item.get("downlink_low"))),
"downlink_high": _hz_to_mhz(_safe_float(item.get("downlink_high"))),
"uplink_low": _hz_to_mhz(_safe_float(item.get("uplink_low"))),
"uplink_high": _hz_to_mhz(_safe_float(item.get("uplink_high"))),
"mode": str(item.get("mode") or ""),
"baud": _safe_float(item.get("baud")),
"status": str(item.get("status") or ""),
"type": str(item.get("type") or ""),
"service": str(item.get("service") or ""),
}
grouped.setdefault(norad_id, []).append(entry)
logger.info(
"SatNOGS fetch complete: %d satellites with transmitter data",
len(grouped),
)
return grouped
except Exception as exc: # noqa: BLE001
logger.warning("Failed to fetch SatNOGS transmitter data: %s", exc)
return {}
def get_transmitters(norad_id: int) -> list[dict]:
"""Return cached transmitter records for a given NORAD catalogue ID.
Refreshes the in-memory cache from the SatNOGS API when the cache is
empty or older than ``_CACHE_TTL`` seconds (24 hours).
Args:
norad_id: The NORAD catalogue ID of the satellite.
Returns:
A (possibly empty) list of transmitter dicts for that satellite.
"""
global _transmitters, _fetched_at # noqa: PLW0603
with _fetch_lock:
age = time.time() - _fetched_at
if not _transmitters or age > _CACHE_TTL:
_transmitters = fetch_transmitters()
_fetched_at = time.time()
return _transmitters.get(int(norad_id), [])
def refresh_transmitters() -> int:
"""Force-refresh the transmitter cache regardless of TTL.
Returns:
The number of satellites (unique NORAD IDs) with transmitter data
after the refresh.
"""
global _transmitters, _fetched_at # noqa: PLW0603
with _fetch_lock:
_transmitters = fetch_transmitters()
_fetched_at = time.time()
return len(_transmitters)
+126 -218
View File
@@ -1,218 +1,126 @@
"""Weather satellite pass prediction utility.
Shared prediction logic used by both the API endpoint and the auto-scheduler.
"""
from __future__ import annotations
import datetime
import time
from typing import Any
from utils.logging import get_logger
from utils.weather_sat import WEATHER_SATELLITES
logger = get_logger('intercept.weather_sat_predict')
# Cache skyfield timescale to avoid re-downloading/re-parsing per request
_cached_timescale = None
def _get_timescale():
global _cached_timescale
if _cached_timescale is None:
from skyfield.api import load
_cached_timescale = load.timescale()
return _cached_timescale
def _format_utc_iso(dt: datetime.datetime) -> str:
"""Return an ISO8601 UTC timestamp with a single timezone designator."""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
else:
dt = dt.astimezone(datetime.timezone.utc)
return dt.isoformat().replace('+00:00', 'Z')
def predict_passes(
lat: float,
lon: float,
hours: int = 24,
min_elevation: float = 15.0,
include_trajectory: bool = False,
include_ground_track: bool = False,
) -> list[dict[str, Any]]:
"""Predict upcoming weather satellite passes for an observer location.
Args:
lat: Observer latitude (-90 to 90)
lon: Observer longitude (-180 to 180)
hours: Hours ahead to predict (1-72)
min_elevation: Minimum max elevation in degrees (0-90)
include_trajectory: Include az/el trajectory points (30 points)
include_ground_track: Include lat/lon ground track points (60 points)
Returns:
List of pass dicts sorted by start time.
Raises:
ImportError: If skyfield is not installed.
"""
from skyfield.almanac import find_discrete
from skyfield.api import EarthSatellite, wgs84
from data.satellites import TLE_SATELLITES
# Use live TLE cache from satellite module if available (refreshed from CelesTrak).
# Cache the reference locally so repeated calls don't re-import each time.
tle_source = TLE_SATELLITES
if not hasattr(predict_passes, '_tle_ref') or \
(time.time() - getattr(predict_passes, '_tle_ref_ts', 0)) > 3600:
try:
from routes.satellite import _tle_cache
if _tle_cache:
predict_passes._tle_ref = _tle_cache
predict_passes._tle_ref_ts = time.time()
except ImportError:
pass
if hasattr(predict_passes, '_tle_ref') and predict_passes._tle_ref:
tle_source = predict_passes._tle_ref
ts = _get_timescale()
observer = wgs84.latlon(lat, lon)
t0 = ts.now()
t1 = ts.utc(t0.utc_datetime() + datetime.timedelta(hours=hours))
all_passes: list[dict[str, Any]] = []
for sat_key, sat_info in WEATHER_SATELLITES.items():
if not sat_info['active']:
continue
tle_data = tle_source.get(sat_info['tle_key'])
if not tle_data:
continue
satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], ts)
def above_horizon(t, _sat=satellite):
diff = _sat - observer
topocentric = diff.at(t)
alt, _, _ = topocentric.altaz()
return alt.degrees > 0
above_horizon.step_days = 1 / 720
try:
times, events = find_discrete(t0, t1, above_horizon)
except Exception:
continue
i = 0
while i < len(times):
if i < len(events) and events[i]: # Rising
rise_time = times[i]
set_time = None
for j in range(i + 1, len(times)):
if not events[j]: # Setting
set_time = times[j]
i = j
break
else:
i += 1
continue
if set_time is None:
i += 1
continue
rise_dt = rise_time.utc_datetime()
set_dt = set_time.utc_datetime()
duration_seconds = (
set_dt - rise_dt
).total_seconds()
duration_minutes = round(duration_seconds / 60, 1)
# Calculate max elevation (always) and trajectory points (only if requested)
max_el = 0.0
max_el_az = 0.0
trajectory: list[dict[str, float]] = []
num_traj_points = 30
for k in range(num_traj_points):
frac = k / (num_traj_points - 1)
t_point = ts.utc(
rise_time.utc_datetime()
+ datetime.timedelta(seconds=duration_seconds * frac)
)
diff = satellite - observer
topocentric = diff.at(t_point)
alt, az, _ = topocentric.altaz()
if alt.degrees > max_el:
max_el = alt.degrees
max_el_az = az.degrees
if include_trajectory:
trajectory.append({
'el': float(max(0, alt.degrees)),
'az': float(az.degrees),
})
if max_el < min_elevation:
i += 1
continue
# Rise/set azimuths
rise_topo = (satellite - observer).at(rise_time)
_, rise_az, _ = rise_topo.altaz()
set_topo = (satellite - observer).at(set_time)
_, set_az, _ = set_topo.altaz()
pass_data: dict[str, Any] = {
'id': f"{sat_key}_{rise_dt.strftime('%Y%m%d%H%M%S')}",
'satellite': sat_key,
'name': sat_info['name'],
'frequency': sat_info['frequency'],
'mode': sat_info['mode'],
'startTime': rise_dt.strftime('%Y-%m-%d %H:%M UTC'),
'startTimeISO': _format_utc_iso(rise_dt),
'endTimeISO': _format_utc_iso(set_dt),
'maxEl': round(max_el, 1),
'maxElAz': round(max_el_az, 1),
'riseAz': round(rise_az.degrees, 1),
'setAz': round(set_az.degrees, 1),
'duration': duration_minutes,
'quality': (
'excellent' if max_el >= 60
else 'good' if max_el >= 30
else 'fair'
),
}
if include_trajectory:
pass_data['trajectory'] = trajectory
if include_ground_track:
ground_track: list[dict[str, float]] = []
for k in range(60):
frac = k / 59
t_point = ts.utc(
rise_time.utc_datetime()
+ datetime.timedelta(seconds=duration_seconds * frac)
)
geocentric = satellite.at(t_point)
subpoint = wgs84.subpoint(geocentric)
ground_track.append({
'lat': float(subpoint.latitude.degrees),
'lon': float(subpoint.longitude.degrees),
})
pass_data['groundTrack'] = ground_track
all_passes.append(pass_data)
i += 1
all_passes.sort(key=lambda p: p['startTimeISO'])
return all_passes
"""Weather satellite pass prediction utility.
Shared prediction logic used by both the API endpoint and the auto-scheduler.
Delegates to utils.satellite_predict for core pass detection, then enriches
results with weather-satellite-specific metadata.
"""
from __future__ import annotations
import datetime
import time
from typing import Any
from utils.logging import get_logger
from utils.weather_sat import WEATHER_SATELLITES
logger = get_logger('intercept.weather_sat_predict')
# Cache skyfield timescale to avoid re-downloading/re-parsing per request
_cached_timescale = None
def _get_timescale():
global _cached_timescale
if _cached_timescale is None:
from skyfield.api import load
_cached_timescale = load.timescale()
return _cached_timescale
def _get_tle_source() -> dict:
"""Return the best available TLE source (live cache preferred over static data)."""
from data.satellites import TLE_SATELLITES
if not hasattr(_get_tle_source, '_ref') or \
(time.time() - getattr(_get_tle_source, '_ref_ts', 0)) > 3600:
try:
from routes.satellite import _tle_cache
if _tle_cache:
_get_tle_source._ref = _tle_cache
_get_tle_source._ref_ts = time.time()
except ImportError:
pass
return getattr(_get_tle_source, '_ref', None) or TLE_SATELLITES
def predict_passes(
lat: float,
lon: float,
hours: int = 24,
min_elevation: float = 15.0,
include_trajectory: bool = False,
include_ground_track: bool = False,
) -> list[dict[str, Any]]:
"""Predict upcoming weather satellite passes for an observer location.
Args:
lat: Observer latitude (-90 to 90)
lon: Observer longitude (-180 to 180)
hours: Hours ahead to predict (1-72)
min_elevation: Minimum peak elevation in degrees (0-90)
include_trajectory: Include az/el trajectory points for polar plot
include_ground_track: Include lat/lon ground track points for map
Returns:
List of pass dicts sorted by start time, enriched with weather-satellite
fields: id, satellite, name, frequency, mode, quality, riseAz, setAz,
maxElAz, and all standard fields from utils.satellite_predict.
"""
from skyfield.api import wgs84
from utils.satellite_predict import predict_passes as _predict_passes
tle_source = _get_tle_source()
ts = _get_timescale()
observer = wgs84.latlon(lat, lon)
t0 = ts.now()
t1 = ts.utc(t0.utc_datetime() + datetime.timedelta(hours=hours))
all_passes: list[dict[str, Any]] = []
for sat_key, sat_info in WEATHER_SATELLITES.items():
if not sat_info['active']:
continue
tle_data = tle_source.get(sat_info['tle_key'])
if not tle_data:
continue
sat_passes = _predict_passes(
tle_data,
observer,
ts,
t0,
t1,
min_el=min_elevation,
include_trajectory=include_trajectory,
include_ground_track=include_ground_track,
)
for p in sat_passes:
aos_iso = p['startTimeISO']
try:
aos_dt = datetime.datetime.fromisoformat(aos_iso)
pass_id = f"{sat_key}_{aos_dt.strftime('%Y%m%d%H%M%S')}"
except Exception:
pass_id = f"{sat_key}_{aos_iso}"
# Enrich with weather-satellite-specific fields
p['id'] = pass_id
p['satellite'] = sat_key
p['name'] = sat_info['name']
p['frequency'] = sat_info['frequency']
p['mode'] = sat_info['mode']
# Backwards-compatible aliases
p['riseAz'] = p['aosAz']
p['setAz'] = p['losAz']
p['maxElAz'] = p['tcaAz']
p['quality'] = (
'excellent' if p['maxEl'] >= 60
else 'good' if p['maxEl'] >= 30
else 'fair'
)
all_passes.extend(sat_passes)
all_passes.sort(key=lambda p: p['startTimeISO'])
return all_passes