fix: point doppler and ground-station scheduler at unified TLE store

Both silently fell back to static bundled TLEs after the removal of
routes.satellite._tle_cache.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-06-11 17:25:29 +01:00
parent 5e996654fe
commit 0af3028151
2 changed files with 220 additions and 193 deletions
+18 -24
View File
@@ -14,7 +14,7 @@ from datetime import datetime, timedelta, timezone
from utils.logging import get_logger
logger = get_logger('intercept.doppler')
logger = get_logger("intercept.doppler")
# Speed of light in m/s
SPEED_OF_LIGHT = 299_792_458.0
@@ -36,12 +36,12 @@ class DopplerInfo:
def to_dict(self) -> dict:
return {
'frequency_hz': self.frequency_hz,
'shift_hz': round(self.shift_hz, 1),
'range_rate_km_s': round(self.range_rate_km_s, 3),
'elevation': round(self.elevation, 1),
'azimuth': round(self.azimuth, 1),
'timestamp': self.timestamp.isoformat(),
"frequency_hz": self.frequency_hz,
"shift_hz": round(self.shift_hz, 1),
"range_rate_km_s": round(self.range_rate_km_s, 3),
"elevation": round(self.elevation, 1),
"azimuth": round(self.azimuth, 1),
"timestamp": self.timestamp.isoformat(),
}
@@ -55,7 +55,7 @@ class DopplerTracker:
def __init__(
self,
satellite_name: str = 'ISS',
satellite_name: str = "ISS",
tle_data: tuple[str, str, str] | None = None,
):
self._satellite_name = satellite_name
@@ -105,20 +105,13 @@ class DopplerTracker:
self._observer_lon = longitude
self._enabled = True
logger.info(
f"DopplerTracker configured for {self._satellite_name} "
f"at ({latitude}, {longitude})"
)
logger.info(f"DopplerTracker configured for {self._satellite_name} at ({latitude}, {longitude})")
return True
def update_tle(self, tle_data: tuple[str, str, str]) -> bool:
"""Update TLE data and re-configure if already enabled."""
self._tle_data = tle_data
if (
self._enabled
and self._observer_lat is not None
and self._observer_lon is not None
):
if self._enabled and self._observer_lat is not None and self._observer_lon is not None:
return self.configure(self._observer_lat, self._observer_lon)
return True
@@ -177,19 +170,20 @@ class DopplerTracker:
if self._tle_data:
return self._tle_data
# Try the live TLE cache maintained by routes/satellite.py
# Try the unified TLE store
try:
from routes.satellite import _tle_cache # type: ignore[import]
if _tle_cache:
tle = _tle_cache.get(self._satellite_name)
if tle:
return tle
except (ImportError, AttributeError):
from utils import tle_store
tle = tle_store.get_tle(self._satellite_name)
if tle:
return tle
except Exception:
pass
# Fall back to static bundled data
try:
from data.satellites import TLE_SATELLITES
return TLE_SATELLITES.get(self._satellite_name)
except ImportError:
return None
+202 -169
View File
@@ -29,14 +29,15 @@ from typing import Any, Callable
from utils.logging import get_logger
logger = get_logger('intercept.ground_station.scheduler')
logger = get_logger("intercept.ground_station.scheduler")
# Env-configurable Doppler retune threshold (Hz)
try:
from config import GS_DOPPLER_THRESHOLD_HZ # type: ignore[import]
except (ImportError, AttributeError):
import os
GS_DOPPLER_THRESHOLD_HZ = int(os.environ.get('INTERCEPT_GS_DOPPLER_THRESHOLD_HZ', 500))
GS_DOPPLER_THRESHOLD_HZ = int(os.environ.get("INTERCEPT_GS_DOPPLER_THRESHOLD_HZ", 500))
DOPPLER_INTERVAL_SECONDS = 5
SCHEDULE_REFRESH_MINUTES = 30
@@ -47,6 +48,7 @@ CAPTURE_BUFFER_SECONDS = 30
# Scheduled observation (state machine)
# ---------------------------------------------------------------------------
class ScheduledObservation:
"""A single scheduled pass for a profile."""
@@ -64,7 +66,7 @@ class ScheduledObservation:
self.aos_iso = aos_iso
self.los_iso = los_iso
self.max_el = max_el
self.status: str = 'scheduled'
self.status: str = "scheduled"
self._start_timer: threading.Timer | None = None
self._stop_timer: threading.Timer | None = None
@@ -78,13 +80,13 @@ class ScheduledObservation:
def to_dict(self) -> dict[str, Any]:
return {
'id': self.id,
'norad_id': self.profile_norad_id,
'satellite': self.satellite_name,
'aos': self.aos_iso,
'los': self.los_iso,
'max_el': self.max_el,
'status': self.status,
"id": self.id,
"norad_id": self.profile_norad_id,
"satellite": self.satellite_name,
"aos": self.aos_iso,
"los": self.los_iso,
"max_el": self.max_el,
"status": self.status,
}
@@ -92,6 +94,7 @@ class ScheduledObservation:
# Scheduler
# ---------------------------------------------------------------------------
class GroundStationScheduler:
"""Automated ground station observation scheduler."""
@@ -104,11 +107,11 @@ class GroundStationScheduler:
# Active capture state
self._active_obs: ScheduledObservation | None = None
self._active_iq_bus = None # IQBus instance
self._active_iq_bus = None # IQBus instance
self._active_waterfall_consumer = None
self._doppler_thread: threading.Thread | None = None
self._doppler_stop = threading.Event()
self._active_profile = None # ObservationProfile
self._active_profile = None # ObservationProfile
self._active_doppler_tracker = None # DopplerTracker
# Shared waterfall queue (consumed by /ws/satellite_waterfall)
@@ -118,15 +121,13 @@ class GroundStationScheduler:
self._lat: float = 0.0
self._lon: float = 0.0
self._device: int = 0
self._sdr_type: str = 'rtlsdr'
self._sdr_type: str = "rtlsdr"
# ------------------------------------------------------------------
# Public control API
# ------------------------------------------------------------------
def set_event_callback(
self, callback: Callable[[dict[str, Any]], None]
) -> None:
def set_event_callback(self, callback: Callable[[dict[str, Any]], None]) -> None:
self._event_callback = callback
def enable(
@@ -134,7 +135,7 @@ class GroundStationScheduler:
lat: float,
lon: float,
device: int = 0,
sdr_type: str = 'rtlsdr',
sdr_type: str = "rtlsdr",
) -> dict[str, Any]:
with self._lock:
self._lat = lat
@@ -157,8 +158,8 @@ class GroundStationScheduler:
if obs._stop_timer:
obs._stop_timer.cancel()
self._observations.clear()
self._stop_active_capture(reason='scheduler_disabled')
return {'status': 'disabled'}
self._stop_active_capture(reason="scheduler_disabled")
return {"status": "disabled"}
@property
def enabled(self) -> bool:
@@ -168,17 +169,14 @@ class GroundStationScheduler:
with self._lock:
active = self._active_obs.to_dict() if self._active_obs else None
return {
'enabled': self._enabled,
'observer': {'latitude': self._lat, 'longitude': self._lon},
'device': self._device,
'sdr_type': self._sdr_type,
'scheduled_count': sum(
1 for o in self._observations if o.status == 'scheduled'
),
'total_observations': len(self._observations),
'active_observation': active,
'waterfall_active': self._active_iq_bus is not None
and self._active_iq_bus.running,
"enabled": self._enabled,
"observer": {"latitude": self._lat, "longitude": self._lon},
"device": self._device,
"sdr_type": self._sdr_type,
"scheduled_count": sum(1 for o in self._observations if o.status == "scheduled"),
"total_observations": len(self._observations),
"active_observation": active,
"waterfall_active": self._active_iq_bus is not None and self._active_iq_bus.running,
}
def get_scheduled_observations(self) -> list[dict[str, Any]]:
@@ -188,9 +186,10 @@ class GroundStationScheduler:
def trigger_manual(self, norad_id: int) -> tuple[bool, str]:
"""Immediately start a manual observation for the given NORAD ID."""
from utils.ground_station.observation_profile import get_profile
profile = get_profile(norad_id)
if not profile:
return False, f'No observation profile for NORAD {norad_id}'
return False, f"No observation profile for NORAD {norad_id}"
obs = ScheduledObservation(
profile_norad_id=norad_id,
satellite_name=profile.name,
@@ -199,11 +198,11 @@ class GroundStationScheduler:
max_el=90.0,
)
self._execute_observation(obs)
return True, 'Manual observation started'
return True, "Manual observation started"
def stop_active(self) -> dict[str, Any]:
"""Stop the currently running observation."""
self._stop_active_capture(reason='manual_stop')
self._stop_active_capture(reason="manual_stop")
return self.get_status()
# ------------------------------------------------------------------
@@ -232,13 +231,13 @@ class GroundStationScheduler:
with self._lock:
# Cancel existing scheduled timers (keep active/complete)
for obs in self._observations:
if obs.status == 'scheduled':
if obs.status == "scheduled":
if obs._start_timer:
obs._start_timer.cancel()
if obs._stop_timer:
obs._stop_timer.cancel()
history = [o for o in self._observations if o.status in ('complete', 'capturing', 'failed')]
history = [o for o in self._observations if o.status in ("complete", "capturing", "failed")]
self._observations = history
now = datetime.now(timezone.utc)
@@ -254,14 +253,12 @@ class GroundStationScheduler:
continue
delay = max(0.0, (capture_start - now).total_seconds())
obs._start_timer = threading.Timer(
delay, self._execute_observation, args=[obs]
)
obs._start_timer = threading.Timer(delay, self._execute_observation, args=[obs])
obs._start_timer.daemon = True
obs._start_timer.start()
self._observations.append(obs)
scheduled = sum(1 for o in self._observations if o.status == 'scheduled')
scheduled = sum(1 for o in self._observations if o.status == "scheduled")
logger.info(f"Ground station scheduler refreshed: {scheduled} observations scheduled")
self._arm_refresh_timer()
@@ -271,15 +268,11 @@ class GroundStationScheduler:
self._refresh_timer.cancel()
if not self._enabled:
return
self._refresh_timer = threading.Timer(
SCHEDULE_REFRESH_MINUTES * 60, self._refresh_schedule
)
self._refresh_timer = threading.Timer(SCHEDULE_REFRESH_MINUTES * 60, self._refresh_schedule)
self._refresh_timer.daemon = True
self._refresh_timer.start()
def _predict_passes_for_profiles(
self, profiles: list
) -> list[ScheduledObservation]:
def _predict_passes_for_profiles(self, profiles: list) -> list[ScheduledObservation]:
"""Predict passes for each profile and return ScheduledObservation list."""
from skyfield.api import load, wgs84
@@ -289,11 +282,13 @@ class GroundStationScheduler:
ts = load.timescale(builtin=True)
except Exception:
from skyfield.api import load as _load
ts = _load.timescale(builtin=True)
observer = wgs84.latlon(self._lat, self._lon)
now = datetime.now(timezone.utc)
import datetime as _dt
t0 = ts.utc(now)
t1 = ts.utc(now + _dt.timedelta(hours=24))
@@ -302,9 +297,7 @@ class GroundStationScheduler:
for profile in profiles:
tle = _find_tle_by_norad(profile.norad_id)
if tle is None:
logger.warning(
f"No TLE for NORAD {profile.norad_id} ({profile.name}) — skipping"
)
logger.warning(f"No TLE for NORAD {profile.norad_id} ({profile.name}) — skipping")
continue
try:
passes = _predict_passes(
@@ -325,9 +318,9 @@ class GroundStationScheduler:
obs = ScheduledObservation(
profile_norad_id=profile.norad_id,
satellite_name=profile.name,
aos_iso=p.get('startTimeISO', ''),
los_iso=p.get('endTimeISO', ''),
max_el=float(p.get('maxEl', 0.0)),
aos_iso=p.get("startTimeISO", ""),
los_iso=p.get("endTimeISO", ""),
max_el=float(p.get("maxEl", 0.0)),
)
observations.append(obs)
@@ -341,25 +334,27 @@ class GroundStationScheduler:
"""Called at AOS (+ buffer) to start IQ capture."""
if not self._enabled:
return
if obs.status == 'scheduled':
obs.status = 'capturing'
if obs.status == "scheduled":
obs.status = "capturing"
else:
return # already cancelled / complete
from utils.ground_station.observation_profile import get_profile
profile = get_profile(obs.profile_norad_id)
if not profile or not profile.enabled:
obs.status = 'failed'
obs.status = "failed"
return
# Claim SDR device
try:
import app as _app
err = _app.claim_sdr_device(self._device, 'ground_station_iq_bus', self._sdr_type)
err = _app.claim_sdr_device(self._device, "ground_station_iq_bus", self._sdr_type)
if err:
logger.warning(f"Ground station: SDR busy — skipping {obs.satellite_name}: {err}")
obs.status = 'failed'
self._emit_event({'type': 'observation_skipped', 'observation': obs.to_dict(), 'reason': 'device_busy'})
obs.status = "failed"
self._emit_event({"type": "observation_skipped", "observation": obs.to_dict(), "reason": "device_busy"})
return
except ImportError:
pass
@@ -369,6 +364,7 @@ class GroundStationScheduler:
# Build IQ bus
from utils.ground_station.iq_bus import IQBus
bus = IQBus(
center_mhz=profile.frequency_mhz,
sample_rate=profile.iq_sample_rate,
@@ -379,6 +375,7 @@ class GroundStationScheduler:
# Attach waterfall consumer (always)
from utils.ground_station.consumers.waterfall import WaterfallConsumer
wf_consumer = WaterfallConsumer(output_queue=self.waterfall_queue)
bus.add_consumer(wf_consumer)
@@ -393,13 +390,14 @@ class GroundStationScheduler:
ok, err_msg = bus.start()
if not ok:
logger.error(f"Ground station: failed to start IQBus for {obs.satellite_name}: {err_msg}")
obs.status = 'failed'
obs.status = "failed"
try:
import app as _app
_app.release_sdr_device(self._device, self._sdr_type)
except ImportError:
pass
self._emit_event({'type': 'observation_failed', 'observation': obs.to_dict(), 'reason': err_msg})
self._emit_event({"type": "observation_failed", "observation": obs.to_dict(), "reason": err_msg})
return
with self._lock:
@@ -410,13 +408,15 @@ class GroundStationScheduler:
# Emit iq_bus_started SSE event (used by Phase 5 waterfall)
span_mhz = profile.iq_sample_rate / 1e6
self._emit_event({
'type': 'iq_bus_started',
'observation': obs.to_dict(),
'center_mhz': profile.frequency_mhz,
'span_mhz': span_mhz,
})
self._emit_event({'type': 'observation_started', 'observation': obs.to_dict()})
self._emit_event(
{
"type": "iq_bus_started",
"observation": obs.to_dict(),
"center_mhz": profile.frequency_mhz,
"span_mhz": span_mhz,
}
)
self._emit_event({"type": "observation_started", "observation": obs.to_dict()})
logger.info(f"Ground station: observation started for {obs.satellite_name} (NORAD {obs.profile_norad_id})")
# Start Doppler correction thread
@@ -426,15 +426,13 @@ class GroundStationScheduler:
now = datetime.now(timezone.utc)
stop_delay = (obs.los_dt + timedelta(seconds=CAPTURE_BUFFER_SECONDS) - now).total_seconds()
if stop_delay > 0:
obs._stop_timer = threading.Timer(
stop_delay, self._stop_active_capture, kwargs={'reason': 'los'}
)
obs._stop_timer = threading.Timer(stop_delay, self._stop_active_capture, kwargs={"reason": "los"})
obs._stop_timer.daemon = True
obs._stop_timer.start()
else:
self._stop_active_capture(reason='los_immediate')
self._stop_active_capture(reason="los_immediate")
def _stop_active_capture(self, *, reason: str = 'manual') -> None:
def _stop_active_capture(self, *, reason: str = "manual") -> None:
"""Stop the currently active capture and release the SDR device."""
with self._lock:
bus = self._active_iq_bus
@@ -451,17 +449,20 @@ class GroundStationScheduler:
bus.stop()
if obs:
obs.status = 'complete'
_update_observation_status(obs, 'complete')
self._emit_event({
'type': 'observation_complete',
'observation': obs.to_dict(),
'reason': reason,
})
self._emit_event({'type': 'iq_bus_stopped', 'observation': obs.to_dict()})
obs.status = "complete"
_update_observation_status(obs, "complete")
self._emit_event(
{
"type": "observation_complete",
"observation": obs.to_dict(),
"reason": reason,
}
)
self._emit_event({"type": "iq_bus_stopped", "observation": obs.to_dict()})
try:
import app as _app
_app.release_sdr_device(self._device, self._sdr_type)
except ImportError:
pass
@@ -478,47 +479,53 @@ class GroundStationScheduler:
tasks = _get_profile_tasks(profile)
if 'telemetry_ax25' in tasks:
if shutil.which('direwolf'):
if "telemetry_ax25" in tasks:
if shutil.which("direwolf"):
from utils.ground_station.consumers.fm_demod import FMDemodConsumer
consumer = FMDemodConsumer(
decoder_cmd=[
'direwolf', '-r', '48000', '-n', '1', '-b', '16', '-',
"direwolf",
"-r",
"48000",
"-n",
"1",
"-b",
"16",
"-",
],
modulation='fm',
on_decoded=lambda line: self._on_packet_decoded(
line, obs_db_id, obs, source='direwolf'
),
modulation="fm",
on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs, source="direwolf"),
)
bus.add_consumer(consumer)
logger.info("Ground station: attached direwolf AX.25 decoder")
else:
logger.warning("direwolf not found — AX.25 decoding disabled")
if 'telemetry_gmsk' in tasks:
if shutil.which('multimon-ng'):
if "telemetry_gmsk" in tasks:
if shutil.which("multimon-ng"):
from utils.ground_station.consumers.fm_demod import FMDemodConsumer
consumer = FMDemodConsumer(
decoder_cmd=['multimon-ng', '-t', 'raw', '-a', 'GMSK', '-'],
modulation='fm',
on_decoded=lambda line: self._on_packet_decoded(
line, obs_db_id, obs, source='multimon-ng'
),
decoder_cmd=["multimon-ng", "-t", "raw", "-a", "GMSK", "-"],
modulation="fm",
on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs, source="multimon-ng"),
)
bus.add_consumer(consumer)
logger.info("Ground station: attached multimon-ng GMSK decoder")
else:
logger.warning("multimon-ng not found — GMSK decoding disabled")
if 'telemetry_bpsk' in tasks:
if "telemetry_bpsk" in tasks:
from utils.ground_station.consumers.gr_satellites import GrSatConsumer
consumer = GrSatConsumer(
satellite_name=profile.name,
on_decoded=lambda pkt: self._on_packet_decoded(
pkt,
obs_db_id,
obs,
source='gr_satellites',
source="gr_satellites",
),
)
bus.add_consumer(consumer)
@@ -539,15 +546,18 @@ class GroundStationScheduler:
def _on_recording_complete(meta_path, data_path):
_insert_recording_record(obs_db_id, meta_path, data_path, profile)
self._emit_event({
'type': 'recording_complete',
'norad_id': profile.norad_id,
'data_path': str(data_path),
'meta_path': str(meta_path),
})
if 'weather_meteor_lrpt' in _get_profile_tasks(profile):
self._emit_event(
{
"type": "recording_complete",
"norad_id": profile.norad_id,
"data_path": str(data_path),
"meta_path": str(meta_path),
}
)
if "weather_meteor_lrpt" in _get_profile_tasks(profile):
try:
from utils.ground_station.meteor_backend import launch_meteor_decode
launch_meteor_decode(
obs_db_id=obs_db_id,
norad_id=profile.norad_id,
@@ -559,13 +569,15 @@ class GroundStationScheduler:
)
except Exception as e:
logger.warning(f"Failed to launch Meteor decode backend: {e}")
self._emit_event({
'type': 'weather_decode_failed',
'norad_id': profile.norad_id,
'satellite': profile.name,
'backend': 'meteor_lrpt',
'message': str(e),
})
self._emit_event(
{
"type": "weather_decode_failed",
"norad_id": profile.norad_id,
"satellite": profile.name,
"backend": "meteor_lrpt",
"message": str(e),
}
)
consumer = SigMFConsumer(metadata=meta, on_complete=_on_recording_complete)
bus.add_consumer(consumer)
@@ -597,7 +609,7 @@ class GroundStationScheduler:
target=self._doppler_loop,
args=[profile, tracker],
daemon=True,
name='gs-doppler',
name="gs-doppler",
)
t.start()
self._doppler_thread = t
@@ -624,15 +636,18 @@ class GroundStationScheduler:
f"{corrected_mhz:.6f} MHz (el={info.elevation:.1f}°)"
)
bus.retune(corrected_mhz)
self._emit_event({
'type': 'doppler_update',
'norad_id': profile.norad_id,
**info.to_dict(),
})
self._emit_event(
{
"type": "doppler_update",
"norad_id": profile.norad_id,
**info.to_dict(),
}
)
# Rotator control (Phase 6)
try:
from utils.rotator import get_rotator
rotator = get_rotator()
if rotator.enabled:
rotator.point_to(info.azimuth, info.elevation)
@@ -651,20 +666,22 @@ class GroundStationScheduler:
obs_db_id: int | None,
obs: ScheduledObservation,
*,
source: str = 'decoder',
source: str = "decoder",
) -> None:
"""Handle a decoded packet payload from a decoder consumer."""
if payload is None or payload == '':
if payload is None or payload == "":
return
packet_event = _build_packet_event(payload, source)
_insert_event_record(obs_db_id, 'packet', json.dumps(packet_event))
self._emit_event({
'type': 'packet_decoded',
'norad_id': obs.profile_norad_id,
'satellite': obs.satellite_name,
**packet_event,
})
_insert_event_record(obs_db_id, "packet", json.dumps(packet_event))
self._emit_event(
{
"type": "packet_decoded",
"norad_id": obs.profile_norad_id,
"satellite": obs.satellite_name,
**packet_event,
}
)
def _emit_event(self, event: dict[str, Any]) -> None:
if self._event_callback:
@@ -684,20 +701,24 @@ def _insert_observation_record(obs: ScheduledObservation, profile) -> int | None
from datetime import datetime, timezone
from utils.database import get_db
with get_db() as conn:
cur = conn.execute('''
cur = conn.execute(
"""
INSERT INTO ground_station_observations
(profile_id, norad_id, satellite, aos_time, los_time, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
profile.id,
obs.profile_norad_id,
obs.satellite_name,
obs.aos_iso,
obs.los_iso,
'capturing',
datetime.now(timezone.utc).isoformat(),
))
""",
(
profile.id,
obs.profile_norad_id,
obs.satellite_name,
obs.aos_iso,
obs.los_iso,
"capturing",
datetime.now(timezone.utc).isoformat(),
),
)
return cur.lastrowid
except Exception as e:
logger.warning(f"Failed to insert observation record: {e}")
@@ -707,10 +728,11 @@ def _insert_observation_record(obs: ScheduledObservation, profile) -> int | None
def _update_observation_status(obs: ScheduledObservation, status: str) -> None:
try:
from utils.database import get_db
with get_db() as conn:
conn.execute(
'UPDATE ground_station_observations SET status=? WHERE norad_id=? AND status=?',
(status, obs.profile_norad_id, 'capturing'),
"UPDATE ground_station_observations SET status=? WHERE norad_id=? AND status=?",
(status, obs.profile_norad_id, "capturing"),
)
except Exception as e:
logger.debug(f"Failed to update observation status: {e}")
@@ -723,17 +745,21 @@ def _insert_event_record(obs_db_id: int | None, event_type: str, payload: str) -
from datetime import datetime, timezone
from utils.database import get_db
with get_db() as conn:
conn.execute('''
conn.execute(
"""
INSERT INTO ground_station_events (observation_id, event_type, payload_json, timestamp)
VALUES (?, ?, ?, ?)
''', (obs_db_id, event_type, payload, datetime.now(timezone.utc).isoformat()))
""",
(obs_db_id, event_type, payload, datetime.now(timezone.utc).isoformat()),
)
except Exception as e:
logger.debug(f"Failed to insert event record: {e}")
def _get_profile_tasks(profile) -> list[str]:
get_tasks = getattr(profile, 'get_tasks', None)
get_tasks = getattr(profile, "get_tasks", None)
if callable(get_tasks):
return get_tasks()
return []
@@ -741,26 +767,26 @@ def _get_profile_tasks(profile) -> list[str]:
def _profile_requires_iq_recording(profile) -> bool:
tasks = _get_profile_tasks(profile)
return bool(getattr(profile, 'record_iq', False) or 'record_iq' in tasks or 'weather_meteor_lrpt' in tasks)
return bool(getattr(profile, "record_iq", False) or "record_iq" in tasks or "weather_meteor_lrpt" in tasks)
def _build_packet_event(payload, source: str) -> dict[str, Any]:
event: dict[str, Any] = {
'source': source,
'data': payload if isinstance(payload, str) else json.dumps(payload),
'parsed': None,
"source": source,
"data": payload if isinstance(payload, str) else json.dumps(payload),
"parsed": None,
}
if isinstance(payload, dict):
event['parsed'] = payload
event['protocol'] = payload.get('protocol') or payload.get('type') or source
event["parsed"] = payload
event["protocol"] = payload.get("protocol") or payload.get("type") or source
return event
text = str(payload).strip()
event['data'] = text
event["data"] = text
parsed = None
if source == 'gr_satellites':
if source == "gr_satellites":
try:
candidate = json.loads(text)
if isinstance(candidate, dict):
@@ -774,7 +800,7 @@ def _build_packet_event(payload, source: str) -> dict[str, Any]:
from utils.satellite_telemetry import auto_parse
for token in text.replace(',', ' ').split():
for token in text.replace(",", " ").split():
cleaned = token.strip()
if not cleaned or len(cleaned) < 8:
continue
@@ -789,9 +815,9 @@ def _build_packet_event(payload, source: str) -> dict[str, Any]:
except Exception:
parsed = None
event['parsed'] = parsed
event["parsed"] = parsed
if isinstance(parsed, dict):
event['protocol'] = parsed.get('protocol') or source
event["protocol"] = parsed.get("protocol") or source
return event
@@ -800,22 +826,26 @@ def _insert_recording_record(obs_db_id: int | None, meta_path: Path, data_path:
from datetime import datetime, timezone
from utils.database import get_db
size = data_path.stat().st_size if data_path.exists() else 0
with get_db() as conn:
conn.execute('''
conn.execute(
"""
INSERT INTO sigmf_recordings
(observation_id, sigmf_data_path, sigmf_meta_path, size_bytes,
sample_rate, center_freq_hz, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
obs_db_id,
str(data_path),
str(meta_path),
size,
profile.iq_sample_rate,
int(profile.frequency_mhz * 1e6),
datetime.now(timezone.utc).isoformat(),
))
""",
(
obs_db_id,
str(data_path),
str(meta_path),
size,
profile.iq_sample_rate,
int(profile.frequency_mhz * 1e6),
datetime.now(timezone.utc).isoformat(),
),
)
except Exception as e:
logger.warning(f"Failed to insert recording record: {e}")
@@ -837,12 +867,12 @@ def _insert_output_record(
with get_db() as conn:
cur = conn.execute(
'''
"""
INSERT INTO ground_station_outputs
(observation_id, norad_id, output_type, backend, file_path,
preview_path, metadata_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''',
""",
(
observation_id,
norad_id,
@@ -870,13 +900,16 @@ def _find_tle_by_norad(norad_id: int) -> tuple[str, str, str] | None:
# Try live cache first
sources = []
try:
from routes.satellite import _tle_cache # type: ignore[import]
if _tle_cache:
sources.append(_tle_cache)
except (ImportError, AttributeError):
from utils import tle_store
live = tle_store.all_tles()
if live:
sources.append(live)
except Exception:
pass
try:
from data.satellites import TLE_SATELLITES
sources.append(TLE_SATELLITES)
except ImportError:
pass
@@ -903,9 +936,9 @@ def _find_tle_by_norad(norad_id: int) -> tuple[str, str, str] | None:
def _parse_utc_iso(value: str) -> datetime:
text = str(value).strip().replace('+00:00Z', 'Z')
if text.endswith('Z'):
text = text[:-1] + '+00:00'
text = str(value).strip().replace("+00:00Z", "Z")
if text.endswith("Z"):
text = text[:-1] + "+00:00"
dt = datetime.fromisoformat(text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)