diff --git a/utils/doppler.py b/utils/doppler.py index 478a68a..187244f 100644 --- a/utils/doppler.py +++ b/utils/doppler.py @@ -14,7 +14,7 @@ from datetime import datetime, timedelta, timezone from utils.logging import get_logger -logger = get_logger('intercept.doppler') +logger = get_logger("intercept.doppler") # Speed of light in m/s SPEED_OF_LIGHT = 299_792_458.0 @@ -36,12 +36,12 @@ class DopplerInfo: def to_dict(self) -> dict: return { - 'frequency_hz': self.frequency_hz, - 'shift_hz': round(self.shift_hz, 1), - 'range_rate_km_s': round(self.range_rate_km_s, 3), - 'elevation': round(self.elevation, 1), - 'azimuth': round(self.azimuth, 1), - 'timestamp': self.timestamp.isoformat(), + "frequency_hz": self.frequency_hz, + "shift_hz": round(self.shift_hz, 1), + "range_rate_km_s": round(self.range_rate_km_s, 3), + "elevation": round(self.elevation, 1), + "azimuth": round(self.azimuth, 1), + "timestamp": self.timestamp.isoformat(), } @@ -55,7 +55,7 @@ class DopplerTracker: def __init__( self, - satellite_name: str = 'ISS', + satellite_name: str = "ISS", tle_data: tuple[str, str, str] | None = None, ): self._satellite_name = satellite_name @@ -105,20 +105,13 @@ class DopplerTracker: self._observer_lon = longitude self._enabled = True - logger.info( - f"DopplerTracker configured for {self._satellite_name} " - f"at ({latitude}, {longitude})" - ) + logger.info(f"DopplerTracker configured for {self._satellite_name} at ({latitude}, {longitude})") return True def update_tle(self, tle_data: tuple[str, str, str]) -> bool: """Update TLE data and re-configure if already enabled.""" self._tle_data = tle_data - if ( - self._enabled - and self._observer_lat is not None - and self._observer_lon is not None - ): + if self._enabled and self._observer_lat is not None and self._observer_lon is not None: return self.configure(self._observer_lat, self._observer_lon) return True @@ -177,19 +170,20 @@ class DopplerTracker: if self._tle_data: return self._tle_data - # Try the live TLE cache maintained by routes/satellite.py + # Try the unified TLE store try: - from routes.satellite import _tle_cache # type: ignore[import] - if _tle_cache: - tle = _tle_cache.get(self._satellite_name) - if tle: - return tle - except (ImportError, AttributeError): + from utils import tle_store + + tle = tle_store.get_tle(self._satellite_name) + if tle: + return tle + except Exception: pass # Fall back to static bundled data try: from data.satellites import TLE_SATELLITES + return TLE_SATELLITES.get(self._satellite_name) except ImportError: return None diff --git a/utils/ground_station/scheduler.py b/utils/ground_station/scheduler.py index c3bfd9a..60a2c0b 100644 --- a/utils/ground_station/scheduler.py +++ b/utils/ground_station/scheduler.py @@ -29,14 +29,15 @@ from typing import Any, Callable from utils.logging import get_logger -logger = get_logger('intercept.ground_station.scheduler') +logger = get_logger("intercept.ground_station.scheduler") # Env-configurable Doppler retune threshold (Hz) try: from config import GS_DOPPLER_THRESHOLD_HZ # type: ignore[import] except (ImportError, AttributeError): import os - GS_DOPPLER_THRESHOLD_HZ = int(os.environ.get('INTERCEPT_GS_DOPPLER_THRESHOLD_HZ', 500)) + + GS_DOPPLER_THRESHOLD_HZ = int(os.environ.get("INTERCEPT_GS_DOPPLER_THRESHOLD_HZ", 500)) DOPPLER_INTERVAL_SECONDS = 5 SCHEDULE_REFRESH_MINUTES = 30 @@ -47,6 +48,7 @@ CAPTURE_BUFFER_SECONDS = 30 # Scheduled observation (state machine) # --------------------------------------------------------------------------- + class ScheduledObservation: """A single scheduled pass for a profile.""" @@ -64,7 +66,7 @@ class ScheduledObservation: self.aos_iso = aos_iso self.los_iso = los_iso self.max_el = max_el - self.status: str = 'scheduled' + self.status: str = "scheduled" self._start_timer: threading.Timer | None = None self._stop_timer: threading.Timer | None = None @@ -78,13 +80,13 @@ class ScheduledObservation: def to_dict(self) -> dict[str, Any]: return { - 'id': self.id, - 'norad_id': self.profile_norad_id, - 'satellite': self.satellite_name, - 'aos': self.aos_iso, - 'los': self.los_iso, - 'max_el': self.max_el, - 'status': self.status, + "id": self.id, + "norad_id": self.profile_norad_id, + "satellite": self.satellite_name, + "aos": self.aos_iso, + "los": self.los_iso, + "max_el": self.max_el, + "status": self.status, } @@ -92,6 +94,7 @@ class ScheduledObservation: # Scheduler # --------------------------------------------------------------------------- + class GroundStationScheduler: """Automated ground station observation scheduler.""" @@ -104,11 +107,11 @@ class GroundStationScheduler: # Active capture state self._active_obs: ScheduledObservation | None = None - self._active_iq_bus = None # IQBus instance + self._active_iq_bus = None # IQBus instance self._active_waterfall_consumer = None self._doppler_thread: threading.Thread | None = None self._doppler_stop = threading.Event() - self._active_profile = None # ObservationProfile + self._active_profile = None # ObservationProfile self._active_doppler_tracker = None # DopplerTracker # Shared waterfall queue (consumed by /ws/satellite_waterfall) @@ -118,15 +121,13 @@ class GroundStationScheduler: self._lat: float = 0.0 self._lon: float = 0.0 self._device: int = 0 - self._sdr_type: str = 'rtlsdr' + self._sdr_type: str = "rtlsdr" # ------------------------------------------------------------------ # Public control API # ------------------------------------------------------------------ - def set_event_callback( - self, callback: Callable[[dict[str, Any]], None] - ) -> None: + def set_event_callback(self, callback: Callable[[dict[str, Any]], None]) -> None: self._event_callback = callback def enable( @@ -134,7 +135,7 @@ class GroundStationScheduler: lat: float, lon: float, device: int = 0, - sdr_type: str = 'rtlsdr', + sdr_type: str = "rtlsdr", ) -> dict[str, Any]: with self._lock: self._lat = lat @@ -157,8 +158,8 @@ class GroundStationScheduler: if obs._stop_timer: obs._stop_timer.cancel() self._observations.clear() - self._stop_active_capture(reason='scheduler_disabled') - return {'status': 'disabled'} + self._stop_active_capture(reason="scheduler_disabled") + return {"status": "disabled"} @property def enabled(self) -> bool: @@ -168,17 +169,14 @@ class GroundStationScheduler: with self._lock: active = self._active_obs.to_dict() if self._active_obs else None return { - 'enabled': self._enabled, - 'observer': {'latitude': self._lat, 'longitude': self._lon}, - 'device': self._device, - 'sdr_type': self._sdr_type, - 'scheduled_count': sum( - 1 for o in self._observations if o.status == 'scheduled' - ), - 'total_observations': len(self._observations), - 'active_observation': active, - 'waterfall_active': self._active_iq_bus is not None - and self._active_iq_bus.running, + "enabled": self._enabled, + "observer": {"latitude": self._lat, "longitude": self._lon}, + "device": self._device, + "sdr_type": self._sdr_type, + "scheduled_count": sum(1 for o in self._observations if o.status == "scheduled"), + "total_observations": len(self._observations), + "active_observation": active, + "waterfall_active": self._active_iq_bus is not None and self._active_iq_bus.running, } def get_scheduled_observations(self) -> list[dict[str, Any]]: @@ -188,9 +186,10 @@ class GroundStationScheduler: def trigger_manual(self, norad_id: int) -> tuple[bool, str]: """Immediately start a manual observation for the given NORAD ID.""" from utils.ground_station.observation_profile import get_profile + profile = get_profile(norad_id) if not profile: - return False, f'No observation profile for NORAD {norad_id}' + return False, f"No observation profile for NORAD {norad_id}" obs = ScheduledObservation( profile_norad_id=norad_id, satellite_name=profile.name, @@ -199,11 +198,11 @@ class GroundStationScheduler: max_el=90.0, ) self._execute_observation(obs) - return True, 'Manual observation started' + return True, "Manual observation started" def stop_active(self) -> dict[str, Any]: """Stop the currently running observation.""" - self._stop_active_capture(reason='manual_stop') + self._stop_active_capture(reason="manual_stop") return self.get_status() # ------------------------------------------------------------------ @@ -232,13 +231,13 @@ class GroundStationScheduler: with self._lock: # Cancel existing scheduled timers (keep active/complete) for obs in self._observations: - if obs.status == 'scheduled': + if obs.status == "scheduled": if obs._start_timer: obs._start_timer.cancel() if obs._stop_timer: obs._stop_timer.cancel() - history = [o for o in self._observations if o.status in ('complete', 'capturing', 'failed')] + history = [o for o in self._observations if o.status in ("complete", "capturing", "failed")] self._observations = history now = datetime.now(timezone.utc) @@ -254,14 +253,12 @@ class GroundStationScheduler: continue delay = max(0.0, (capture_start - now).total_seconds()) - obs._start_timer = threading.Timer( - delay, self._execute_observation, args=[obs] - ) + obs._start_timer = threading.Timer(delay, self._execute_observation, args=[obs]) obs._start_timer.daemon = True obs._start_timer.start() self._observations.append(obs) - scheduled = sum(1 for o in self._observations if o.status == 'scheduled') + scheduled = sum(1 for o in self._observations if o.status == "scheduled") logger.info(f"Ground station scheduler refreshed: {scheduled} observations scheduled") self._arm_refresh_timer() @@ -271,15 +268,11 @@ class GroundStationScheduler: self._refresh_timer.cancel() if not self._enabled: return - self._refresh_timer = threading.Timer( - SCHEDULE_REFRESH_MINUTES * 60, self._refresh_schedule - ) + self._refresh_timer = threading.Timer(SCHEDULE_REFRESH_MINUTES * 60, self._refresh_schedule) self._refresh_timer.daemon = True self._refresh_timer.start() - def _predict_passes_for_profiles( - self, profiles: list - ) -> list[ScheduledObservation]: + def _predict_passes_for_profiles(self, profiles: list) -> list[ScheduledObservation]: """Predict passes for each profile and return ScheduledObservation list.""" from skyfield.api import load, wgs84 @@ -289,11 +282,13 @@ class GroundStationScheduler: ts = load.timescale(builtin=True) except Exception: from skyfield.api import load as _load + ts = _load.timescale(builtin=True) observer = wgs84.latlon(self._lat, self._lon) now = datetime.now(timezone.utc) import datetime as _dt + t0 = ts.utc(now) t1 = ts.utc(now + _dt.timedelta(hours=24)) @@ -302,9 +297,7 @@ class GroundStationScheduler: for profile in profiles: tle = _find_tle_by_norad(profile.norad_id) if tle is None: - logger.warning( - f"No TLE for NORAD {profile.norad_id} ({profile.name}) — skipping" - ) + logger.warning(f"No TLE for NORAD {profile.norad_id} ({profile.name}) — skipping") continue try: passes = _predict_passes( @@ -325,9 +318,9 @@ class GroundStationScheduler: obs = ScheduledObservation( profile_norad_id=profile.norad_id, satellite_name=profile.name, - aos_iso=p.get('startTimeISO', ''), - los_iso=p.get('endTimeISO', ''), - max_el=float(p.get('maxEl', 0.0)), + aos_iso=p.get("startTimeISO", ""), + los_iso=p.get("endTimeISO", ""), + max_el=float(p.get("maxEl", 0.0)), ) observations.append(obs) @@ -341,25 +334,27 @@ class GroundStationScheduler: """Called at AOS (+ buffer) to start IQ capture.""" if not self._enabled: return - if obs.status == 'scheduled': - obs.status = 'capturing' + if obs.status == "scheduled": + obs.status = "capturing" else: return # already cancelled / complete from utils.ground_station.observation_profile import get_profile + profile = get_profile(obs.profile_norad_id) if not profile or not profile.enabled: - obs.status = 'failed' + obs.status = "failed" return # Claim SDR device try: import app as _app - err = _app.claim_sdr_device(self._device, 'ground_station_iq_bus', self._sdr_type) + + err = _app.claim_sdr_device(self._device, "ground_station_iq_bus", self._sdr_type) if err: logger.warning(f"Ground station: SDR busy — skipping {obs.satellite_name}: {err}") - obs.status = 'failed' - self._emit_event({'type': 'observation_skipped', 'observation': obs.to_dict(), 'reason': 'device_busy'}) + obs.status = "failed" + self._emit_event({"type": "observation_skipped", "observation": obs.to_dict(), "reason": "device_busy"}) return except ImportError: pass @@ -369,6 +364,7 @@ class GroundStationScheduler: # Build IQ bus from utils.ground_station.iq_bus import IQBus + bus = IQBus( center_mhz=profile.frequency_mhz, sample_rate=profile.iq_sample_rate, @@ -379,6 +375,7 @@ class GroundStationScheduler: # Attach waterfall consumer (always) from utils.ground_station.consumers.waterfall import WaterfallConsumer + wf_consumer = WaterfallConsumer(output_queue=self.waterfall_queue) bus.add_consumer(wf_consumer) @@ -393,13 +390,14 @@ class GroundStationScheduler: ok, err_msg = bus.start() if not ok: logger.error(f"Ground station: failed to start IQBus for {obs.satellite_name}: {err_msg}") - obs.status = 'failed' + obs.status = "failed" try: import app as _app + _app.release_sdr_device(self._device, self._sdr_type) except ImportError: pass - self._emit_event({'type': 'observation_failed', 'observation': obs.to_dict(), 'reason': err_msg}) + self._emit_event({"type": "observation_failed", "observation": obs.to_dict(), "reason": err_msg}) return with self._lock: @@ -410,13 +408,15 @@ class GroundStationScheduler: # Emit iq_bus_started SSE event (used by Phase 5 waterfall) span_mhz = profile.iq_sample_rate / 1e6 - self._emit_event({ - 'type': 'iq_bus_started', - 'observation': obs.to_dict(), - 'center_mhz': profile.frequency_mhz, - 'span_mhz': span_mhz, - }) - self._emit_event({'type': 'observation_started', 'observation': obs.to_dict()}) + self._emit_event( + { + "type": "iq_bus_started", + "observation": obs.to_dict(), + "center_mhz": profile.frequency_mhz, + "span_mhz": span_mhz, + } + ) + self._emit_event({"type": "observation_started", "observation": obs.to_dict()}) logger.info(f"Ground station: observation started for {obs.satellite_name} (NORAD {obs.profile_norad_id})") # Start Doppler correction thread @@ -426,15 +426,13 @@ class GroundStationScheduler: now = datetime.now(timezone.utc) stop_delay = (obs.los_dt + timedelta(seconds=CAPTURE_BUFFER_SECONDS) - now).total_seconds() if stop_delay > 0: - obs._stop_timer = threading.Timer( - stop_delay, self._stop_active_capture, kwargs={'reason': 'los'} - ) + obs._stop_timer = threading.Timer(stop_delay, self._stop_active_capture, kwargs={"reason": "los"}) obs._stop_timer.daemon = True obs._stop_timer.start() else: - self._stop_active_capture(reason='los_immediate') + self._stop_active_capture(reason="los_immediate") - def _stop_active_capture(self, *, reason: str = 'manual') -> None: + def _stop_active_capture(self, *, reason: str = "manual") -> None: """Stop the currently active capture and release the SDR device.""" with self._lock: bus = self._active_iq_bus @@ -451,17 +449,20 @@ class GroundStationScheduler: bus.stop() if obs: - obs.status = 'complete' - _update_observation_status(obs, 'complete') - self._emit_event({ - 'type': 'observation_complete', - 'observation': obs.to_dict(), - 'reason': reason, - }) - self._emit_event({'type': 'iq_bus_stopped', 'observation': obs.to_dict()}) + obs.status = "complete" + _update_observation_status(obs, "complete") + self._emit_event( + { + "type": "observation_complete", + "observation": obs.to_dict(), + "reason": reason, + } + ) + self._emit_event({"type": "iq_bus_stopped", "observation": obs.to_dict()}) try: import app as _app + _app.release_sdr_device(self._device, self._sdr_type) except ImportError: pass @@ -478,47 +479,53 @@ class GroundStationScheduler: tasks = _get_profile_tasks(profile) - if 'telemetry_ax25' in tasks: - if shutil.which('direwolf'): + if "telemetry_ax25" in tasks: + if shutil.which("direwolf"): from utils.ground_station.consumers.fm_demod import FMDemodConsumer + consumer = FMDemodConsumer( decoder_cmd=[ - 'direwolf', '-r', '48000', '-n', '1', '-b', '16', '-', + "direwolf", + "-r", + "48000", + "-n", + "1", + "-b", + "16", + "-", ], - modulation='fm', - on_decoded=lambda line: self._on_packet_decoded( - line, obs_db_id, obs, source='direwolf' - ), + modulation="fm", + on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs, source="direwolf"), ) bus.add_consumer(consumer) logger.info("Ground station: attached direwolf AX.25 decoder") else: logger.warning("direwolf not found — AX.25 decoding disabled") - if 'telemetry_gmsk' in tasks: - if shutil.which('multimon-ng'): + if "telemetry_gmsk" in tasks: + if shutil.which("multimon-ng"): from utils.ground_station.consumers.fm_demod import FMDemodConsumer + consumer = FMDemodConsumer( - decoder_cmd=['multimon-ng', '-t', 'raw', '-a', 'GMSK', '-'], - modulation='fm', - on_decoded=lambda line: self._on_packet_decoded( - line, obs_db_id, obs, source='multimon-ng' - ), + decoder_cmd=["multimon-ng", "-t", "raw", "-a", "GMSK", "-"], + modulation="fm", + on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs, source="multimon-ng"), ) bus.add_consumer(consumer) logger.info("Ground station: attached multimon-ng GMSK decoder") else: logger.warning("multimon-ng not found — GMSK decoding disabled") - if 'telemetry_bpsk' in tasks: + if "telemetry_bpsk" in tasks: from utils.ground_station.consumers.gr_satellites import GrSatConsumer + consumer = GrSatConsumer( satellite_name=profile.name, on_decoded=lambda pkt: self._on_packet_decoded( pkt, obs_db_id, obs, - source='gr_satellites', + source="gr_satellites", ), ) bus.add_consumer(consumer) @@ -539,15 +546,18 @@ class GroundStationScheduler: def _on_recording_complete(meta_path, data_path): _insert_recording_record(obs_db_id, meta_path, data_path, profile) - self._emit_event({ - 'type': 'recording_complete', - 'norad_id': profile.norad_id, - 'data_path': str(data_path), - 'meta_path': str(meta_path), - }) - if 'weather_meteor_lrpt' in _get_profile_tasks(profile): + self._emit_event( + { + "type": "recording_complete", + "norad_id": profile.norad_id, + "data_path": str(data_path), + "meta_path": str(meta_path), + } + ) + if "weather_meteor_lrpt" in _get_profile_tasks(profile): try: from utils.ground_station.meteor_backend import launch_meteor_decode + launch_meteor_decode( obs_db_id=obs_db_id, norad_id=profile.norad_id, @@ -559,13 +569,15 @@ class GroundStationScheduler: ) except Exception as e: logger.warning(f"Failed to launch Meteor decode backend: {e}") - self._emit_event({ - 'type': 'weather_decode_failed', - 'norad_id': profile.norad_id, - 'satellite': profile.name, - 'backend': 'meteor_lrpt', - 'message': str(e), - }) + self._emit_event( + { + "type": "weather_decode_failed", + "norad_id": profile.norad_id, + "satellite": profile.name, + "backend": "meteor_lrpt", + "message": str(e), + } + ) consumer = SigMFConsumer(metadata=meta, on_complete=_on_recording_complete) bus.add_consumer(consumer) @@ -597,7 +609,7 @@ class GroundStationScheduler: target=self._doppler_loop, args=[profile, tracker], daemon=True, - name='gs-doppler', + name="gs-doppler", ) t.start() self._doppler_thread = t @@ -624,15 +636,18 @@ class GroundStationScheduler: f"{corrected_mhz:.6f} MHz (el={info.elevation:.1f}°)" ) bus.retune(corrected_mhz) - self._emit_event({ - 'type': 'doppler_update', - 'norad_id': profile.norad_id, - **info.to_dict(), - }) + self._emit_event( + { + "type": "doppler_update", + "norad_id": profile.norad_id, + **info.to_dict(), + } + ) # Rotator control (Phase 6) try: from utils.rotator import get_rotator + rotator = get_rotator() if rotator.enabled: rotator.point_to(info.azimuth, info.elevation) @@ -651,20 +666,22 @@ class GroundStationScheduler: obs_db_id: int | None, obs: ScheduledObservation, *, - source: str = 'decoder', + source: str = "decoder", ) -> None: """Handle a decoded packet payload from a decoder consumer.""" - if payload is None or payload == '': + if payload is None or payload == "": return packet_event = _build_packet_event(payload, source) - _insert_event_record(obs_db_id, 'packet', json.dumps(packet_event)) - self._emit_event({ - 'type': 'packet_decoded', - 'norad_id': obs.profile_norad_id, - 'satellite': obs.satellite_name, - **packet_event, - }) + _insert_event_record(obs_db_id, "packet", json.dumps(packet_event)) + self._emit_event( + { + "type": "packet_decoded", + "norad_id": obs.profile_norad_id, + "satellite": obs.satellite_name, + **packet_event, + } + ) def _emit_event(self, event: dict[str, Any]) -> None: if self._event_callback: @@ -684,20 +701,24 @@ def _insert_observation_record(obs: ScheduledObservation, profile) -> int | None from datetime import datetime, timezone from utils.database import get_db + with get_db() as conn: - cur = conn.execute(''' + cur = conn.execute( + """ INSERT INTO ground_station_observations (profile_id, norad_id, satellite, aos_time, los_time, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) - ''', ( - profile.id, - obs.profile_norad_id, - obs.satellite_name, - obs.aos_iso, - obs.los_iso, - 'capturing', - datetime.now(timezone.utc).isoformat(), - )) + """, + ( + profile.id, + obs.profile_norad_id, + obs.satellite_name, + obs.aos_iso, + obs.los_iso, + "capturing", + datetime.now(timezone.utc).isoformat(), + ), + ) return cur.lastrowid except Exception as e: logger.warning(f"Failed to insert observation record: {e}") @@ -707,10 +728,11 @@ def _insert_observation_record(obs: ScheduledObservation, profile) -> int | None def _update_observation_status(obs: ScheduledObservation, status: str) -> None: try: from utils.database import get_db + with get_db() as conn: conn.execute( - 'UPDATE ground_station_observations SET status=? WHERE norad_id=? AND status=?', - (status, obs.profile_norad_id, 'capturing'), + "UPDATE ground_station_observations SET status=? WHERE norad_id=? AND status=?", + (status, obs.profile_norad_id, "capturing"), ) except Exception as e: logger.debug(f"Failed to update observation status: {e}") @@ -723,17 +745,21 @@ def _insert_event_record(obs_db_id: int | None, event_type: str, payload: str) - from datetime import datetime, timezone from utils.database import get_db + with get_db() as conn: - conn.execute(''' + conn.execute( + """ INSERT INTO ground_station_events (observation_id, event_type, payload_json, timestamp) VALUES (?, ?, ?, ?) - ''', (obs_db_id, event_type, payload, datetime.now(timezone.utc).isoformat())) + """, + (obs_db_id, event_type, payload, datetime.now(timezone.utc).isoformat()), + ) except Exception as e: logger.debug(f"Failed to insert event record: {e}") def _get_profile_tasks(profile) -> list[str]: - get_tasks = getattr(profile, 'get_tasks', None) + get_tasks = getattr(profile, "get_tasks", None) if callable(get_tasks): return get_tasks() return [] @@ -741,26 +767,26 @@ def _get_profile_tasks(profile) -> list[str]: def _profile_requires_iq_recording(profile) -> bool: tasks = _get_profile_tasks(profile) - return bool(getattr(profile, 'record_iq', False) or 'record_iq' in tasks or 'weather_meteor_lrpt' in tasks) + return bool(getattr(profile, "record_iq", False) or "record_iq" in tasks or "weather_meteor_lrpt" in tasks) def _build_packet_event(payload, source: str) -> dict[str, Any]: event: dict[str, Any] = { - 'source': source, - 'data': payload if isinstance(payload, str) else json.dumps(payload), - 'parsed': None, + "source": source, + "data": payload if isinstance(payload, str) else json.dumps(payload), + "parsed": None, } if isinstance(payload, dict): - event['parsed'] = payload - event['protocol'] = payload.get('protocol') or payload.get('type') or source + event["parsed"] = payload + event["protocol"] = payload.get("protocol") or payload.get("type") or source return event text = str(payload).strip() - event['data'] = text + event["data"] = text parsed = None - if source == 'gr_satellites': + if source == "gr_satellites": try: candidate = json.loads(text) if isinstance(candidate, dict): @@ -774,7 +800,7 @@ def _build_packet_event(payload, source: str) -> dict[str, Any]: from utils.satellite_telemetry import auto_parse - for token in text.replace(',', ' ').split(): + for token in text.replace(",", " ").split(): cleaned = token.strip() if not cleaned or len(cleaned) < 8: continue @@ -789,9 +815,9 @@ def _build_packet_event(payload, source: str) -> dict[str, Any]: except Exception: parsed = None - event['parsed'] = parsed + event["parsed"] = parsed if isinstance(parsed, dict): - event['protocol'] = parsed.get('protocol') or source + event["protocol"] = parsed.get("protocol") or source return event @@ -800,22 +826,26 @@ def _insert_recording_record(obs_db_id: int | None, meta_path: Path, data_path: from datetime import datetime, timezone from utils.database import get_db + size = data_path.stat().st_size if data_path.exists() else 0 with get_db() as conn: - conn.execute(''' + conn.execute( + """ INSERT INTO sigmf_recordings (observation_id, sigmf_data_path, sigmf_meta_path, size_bytes, sample_rate, center_freq_hz, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) - ''', ( - obs_db_id, - str(data_path), - str(meta_path), - size, - profile.iq_sample_rate, - int(profile.frequency_mhz * 1e6), - datetime.now(timezone.utc).isoformat(), - )) + """, + ( + obs_db_id, + str(data_path), + str(meta_path), + size, + profile.iq_sample_rate, + int(profile.frequency_mhz * 1e6), + datetime.now(timezone.utc).isoformat(), + ), + ) except Exception as e: logger.warning(f"Failed to insert recording record: {e}") @@ -837,12 +867,12 @@ def _insert_output_record( with get_db() as conn: cur = conn.execute( - ''' + """ INSERT INTO ground_station_outputs (observation_id, norad_id, output_type, backend, file_path, preview_path, metadata_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ''', + """, ( observation_id, norad_id, @@ -870,13 +900,16 @@ def _find_tle_by_norad(norad_id: int) -> tuple[str, str, str] | None: # Try live cache first sources = [] try: - from routes.satellite import _tle_cache # type: ignore[import] - if _tle_cache: - sources.append(_tle_cache) - except (ImportError, AttributeError): + from utils import tle_store + + live = tle_store.all_tles() + if live: + sources.append(live) + except Exception: pass try: from data.satellites import TLE_SATELLITES + sources.append(TLE_SATELLITES) except ImportError: pass @@ -903,9 +936,9 @@ def _find_tle_by_norad(norad_id: int) -> tuple[str, str, str] | None: def _parse_utc_iso(value: str) -> datetime: - text = str(value).strip().replace('+00:00Z', 'Z') - if text.endswith('Z'): - text = text[:-1] + '+00:00' + text = str(value).strip().replace("+00:00Z", "Z") + if text.endswith("Z"): + text = text[:-1] + "+00:00" dt = datetime.fromisoformat(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc)