Files
intercept/utils/ground_station/observation_profile.py
James Smith 4607c358ed Add ground station automation with 6-phase implementation
Phase 1 - Automated observation engine:
- utils/ground_station/scheduler.py: GroundStationScheduler fires at AOS/LOS,
  claims SDR, manages IQBus lifecycle, emits SSE events
- utils/ground_station/observation_profile.py: ObservationProfile dataclass + DB CRUD
- routes/ground_station.py: REST API for profiles, scheduler, observations, recordings,
  rotator; SSE stream; /ws/satellite_waterfall WebSocket
- DB tables: observation_profiles, ground_station_observations, ground_station_events,
  sigmf_recordings (added to utils/database.py init_db)
- app.py: ground_station_queue, WebSocket init, scheduler startup in _deferred_init
- routes/__init__.py: register ground_station_bp

Phase 2 - Doppler correction:
- utils/doppler.py: generalized DopplerTracker extracted from sstv_decoder.py;
  accepts satellite name or raw TLE tuple; thread-safe; update_tle() method
- utils/sstv/sstv_decoder.py: replace inline DopplerTracker with import from utils.doppler
- Scheduler runs 5s retune loop; calls rotator.point_to() if enabled

Phase 3 - IQ recording (SigMF):
- utils/sigmf.py: SigMFWriter writes .sigmf-data + .sigmf-meta; disk-free guard (500MB)
- utils/ground_station/consumers/sigmf_writer.py: SigMFConsumer wraps SigMFWriter

Phase 4 - Multi-decoder IQ broadcast pipeline:
- utils/ground_station/iq_bus.py: IQBus single-producer fan-out; IQConsumer Protocol
- utils/ground_station/consumers/waterfall.py: CU8→FFT→binary frames
- utils/ground_station/consumers/fm_demod.py: CU8→FM demod (numpy)→decoder subprocess
- utils/ground_station/consumers/gr_satellites.py: CU8→cf32→gr_satellites (optional)

Phase 5 - Live spectrum waterfall:
- static/js/modes/ground_station_waterfall.js: /ws/satellite_waterfall canvas renderer
- Waterfall panel in satellite dashboard sidebar, auto-shown on iq_bus_started SSE event

Phase 6 - Antenna rotator control (optional):
- utils/rotator.py: RotatorController TCP client for rotctld (Hamlib line protocol)
- Rotator panel in satellite dashboard; silently disabled if rotctld unreachable

Also fixes pre-existing test_weather_sat_predict.py breakage:
- utils/weather_sat_predict.py: rewritten with self-contained skyfield implementation
  using find_discrete (matching what committed tests expected); adds _format_utc_iso
- tests/test_weather_sat_predict.py: add _MOCK_WEATHER_SATS and @patch decorators
  for tests that assumed NOAA-18 active (decommissioned Jun 2025, now active=False)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 17:36:55 +00:00

141 lines
4.8 KiB
Python

"""Observation profile dataclass and DB CRUD.
An ObservationProfile describes *how* to capture a particular satellite:
frequency, decoder type, gain, bandwidth, minimum elevation, and whether
to record raw IQ in SigMF format.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from utils.logging import get_logger
logger = get_logger('intercept.ground_station.profile')
@dataclass
class ObservationProfile:
"""Per-satellite capture configuration."""
norad_id: int
name: str # Human-readable label
frequency_mhz: float
decoder_type: str # 'fm', 'afsk', 'bpsk', 'gmsk', 'iq_only'
gain: float = 40.0
bandwidth_hz: int = 200_000
min_elevation: float = 10.0
enabled: bool = True
record_iq: bool = False
iq_sample_rate: int = 2_400_000
id: int | None = None
created_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_dict(self) -> dict[str, Any]:
return {
'id': self.id,
'norad_id': self.norad_id,
'name': self.name,
'frequency_mhz': self.frequency_mhz,
'decoder_type': self.decoder_type,
'gain': self.gain,
'bandwidth_hz': self.bandwidth_hz,
'min_elevation': self.min_elevation,
'enabled': self.enabled,
'record_iq': self.record_iq,
'iq_sample_rate': self.iq_sample_rate,
'created_at': self.created_at,
}
@classmethod
def from_row(cls, row) -> 'ObservationProfile':
return cls(
id=row['id'],
norad_id=row['norad_id'],
name=row['name'],
frequency_mhz=row['frequency_mhz'],
decoder_type=row['decoder_type'],
gain=row['gain'],
bandwidth_hz=row['bandwidth_hz'],
min_elevation=row['min_elevation'],
enabled=bool(row['enabled']),
record_iq=bool(row['record_iq']),
iq_sample_rate=row['iq_sample_rate'],
created_at=row['created_at'],
)
# ---------------------------------------------------------------------------
# DB CRUD
# ---------------------------------------------------------------------------
def list_profiles() -> list[ObservationProfile]:
"""Return all observation profiles from the database."""
from utils.database import get_db
with get_db() as conn:
rows = conn.execute(
'SELECT * FROM observation_profiles ORDER BY created_at DESC'
).fetchall()
return [ObservationProfile.from_row(r) for r in rows]
def get_profile(norad_id: int) -> ObservationProfile | None:
"""Return the profile for a NORAD ID, or None if not found."""
from utils.database import get_db
with get_db() as conn:
row = conn.execute(
'SELECT * FROM observation_profiles WHERE norad_id = ?', (norad_id,)
).fetchone()
return ObservationProfile.from_row(row) if row else None
def save_profile(profile: ObservationProfile) -> ObservationProfile:
"""Insert or replace an observation profile. Returns the saved profile."""
from utils.database import get_db
with get_db() as conn:
conn.execute('''
INSERT INTO observation_profiles
(norad_id, name, frequency_mhz, decoder_type, gain,
bandwidth_hz, min_elevation, enabled, record_iq,
iq_sample_rate, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(norad_id) DO UPDATE SET
name=excluded.name,
frequency_mhz=excluded.frequency_mhz,
decoder_type=excluded.decoder_type,
gain=excluded.gain,
bandwidth_hz=excluded.bandwidth_hz,
min_elevation=excluded.min_elevation,
enabled=excluded.enabled,
record_iq=excluded.record_iq,
iq_sample_rate=excluded.iq_sample_rate
''', (
profile.norad_id,
profile.name,
profile.frequency_mhz,
profile.decoder_type,
profile.gain,
profile.bandwidth_hz,
profile.min_elevation,
int(profile.enabled),
int(profile.record_iq),
profile.iq_sample_rate,
profile.created_at,
))
return get_profile(profile.norad_id) or profile
def delete_profile(norad_id: int) -> bool:
"""Delete a profile by NORAD ID. Returns True if a row was deleted."""
from utils.database import get_db
with get_db() as conn:
cur = conn.execute(
'DELETE FROM observation_profiles WHERE norad_id = ?', (norad_id,)
)
return cur.rowcount > 0