Add Meteor LRPT ground station pipeline

This commit is contained in:
James Smith
2026-03-18 22:01:52 +00:00
parent 5cae753e0d
commit e388baa464
7 changed files with 756 additions and 71 deletions
+54 -26
View File
@@ -641,18 +641,19 @@ def init_db() -> None:
# =====================================================================
# Observation profiles — per-satellite capture configuration
conn.execute('''
CREATE TABLE IF NOT EXISTS observation_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
norad_id INTEGER UNIQUE NOT NULL,
name TEXT NOT NULL,
frequency_mhz REAL NOT NULL,
decoder_type TEXT NOT NULL DEFAULT 'fm',
gain REAL DEFAULT 40.0,
bandwidth_hz INTEGER DEFAULT 200000,
min_elevation REAL DEFAULT 10.0,
enabled BOOLEAN DEFAULT 1,
record_iq BOOLEAN DEFAULT 0,
conn.execute('''
CREATE TABLE IF NOT EXISTS observation_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
norad_id INTEGER UNIQUE NOT NULL,
name TEXT NOT NULL,
frequency_mhz REAL NOT NULL,
decoder_type TEXT NOT NULL DEFAULT 'fm',
tasks_json TEXT,
gain REAL DEFAULT 40.0,
bandwidth_hz INTEGER DEFAULT 200000,
min_elevation REAL DEFAULT 10.0,
enabled BOOLEAN DEFAULT 1,
record_iq BOOLEAN DEFAULT 0,
iq_sample_rate INTEGER DEFAULT 2400000,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
@@ -688,31 +689,58 @@ def init_db() -> None:
''')
# SigMF recordings — one row per IQ recording file pair
conn.execute('''
CREATE TABLE IF NOT EXISTS sigmf_recordings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observation_id INTEGER,
sigmf_data_path TEXT NOT NULL,
conn.execute('''
CREATE TABLE IF NOT EXISTS sigmf_recordings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observation_id INTEGER,
sigmf_data_path TEXT NOT NULL,
sigmf_meta_path TEXT NOT NULL,
size_bytes INTEGER DEFAULT 0,
sample_rate INTEGER,
center_freq_hz INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (observation_id) REFERENCES ground_station_observations(id) ON DELETE SET NULL
)
''')
FOREIGN KEY (observation_id) REFERENCES ground_station_observations(id) ON DELETE SET NULL
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS ground_station_outputs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observation_id INTEGER,
norad_id INTEGER,
output_type TEXT NOT NULL,
backend TEXT,
file_path TEXT NOT NULL,
preview_path TEXT,
metadata_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (observation_id) REFERENCES ground_station_observations(id) ON DELETE CASCADE
)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gs_observations_norad
ON ground_station_observations(norad_id, created_at)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gs_events_observation
ON ground_station_events(observation_id, timestamp)
''')
logger.info("Database initialized successfully")
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gs_events_observation
ON ground_station_events(observation_id, timestamp)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gs_outputs_observation
ON ground_station_outputs(observation_id, created_at)
''')
# Lightweight schema migrations for existing installs.
profile_cols = {
row['name'] for row in conn.execute('PRAGMA table_info(observation_profiles)')
}
if 'tasks_json' not in profile_cols:
conn.execute('ALTER TABLE observation_profiles ADD COLUMN tasks_json TEXT')
logger.info("Database initialized successfully")
def close_db() -> None:
+189
View File
@@ -0,0 +1,189 @@
"""Meteor LRPT offline decode backend for ground-station observations."""
from __future__ import annotations
import threading
import time
from pathlib import Path
from utils.logging import get_logger
from utils.weather_sat import WeatherSatDecoder
logger = get_logger('intercept.ground_station.meteor_backend')
OUTPUT_ROOT = Path('instance/ground_station/weather_outputs')
DECODE_TIMEOUT_SECONDS = 30 * 60
_NORAD_TO_SAT_KEY = {
57166: 'METEOR-M2-3',
59051: 'METEOR-M2-4',
}
def resolve_meteor_satellite_key(norad_id: int, satellite_name: str) -> str | None:
if norad_id in _NORAD_TO_SAT_KEY:
return _NORAD_TO_SAT_KEY[norad_id]
upper = str(satellite_name or '').upper()
if 'M2-4' in upper:
return 'METEOR-M2-4'
if 'M2-3' in upper or 'METEOR' in upper:
return 'METEOR-M2-3'
return None
def launch_meteor_decode(
*,
obs_db_id: int | None,
norad_id: int,
satellite_name: str,
sample_rate: int,
data_path: Path,
emit_event,
register_output,
) -> None:
"""Run Meteor LRPT offline decode in a background thread."""
t = threading.Thread(
target=_run_decode,
kwargs={
'obs_db_id': obs_db_id,
'norad_id': norad_id,
'satellite_name': satellite_name,
'sample_rate': sample_rate,
'data_path': data_path,
'emit_event': emit_event,
'register_output': register_output,
},
daemon=True,
name=f'gs-meteor-decode-{norad_id}',
)
t.start()
def _run_decode(
*,
obs_db_id: int | None,
norad_id: int,
satellite_name: str,
sample_rate: int,
data_path: Path,
emit_event,
register_output,
) -> None:
sat_key = resolve_meteor_satellite_key(norad_id, satellite_name)
if not sat_key:
emit_event({
'type': 'weather_decode_failed',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'message': 'No Meteor satellite mapping is available for this observation.',
})
return
output_dir = OUTPUT_ROOT / f'{norad_id}_{int(time.time())}'
decoder = WeatherSatDecoder(output_dir=output_dir)
if decoder.decoder_available is None:
emit_event({
'type': 'weather_decode_failed',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'message': 'SatDump backend is not available for Meteor LRPT decode.',
})
return
def _progress_cb(progress):
progress_event = progress.to_dict()
progress_event.pop('type', None)
emit_event({
'type': 'weather_decode_progress',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
**progress_event,
})
decoder.set_callback(_progress_cb)
emit_event({
'type': 'weather_decode_started',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'input_path': str(data_path),
})
ok, error = decoder.start_from_file(
satellite=sat_key,
input_file=data_path,
sample_rate=sample_rate,
)
if not ok:
emit_event({
'type': 'weather_decode_failed',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'message': error or 'Meteor decode failed to start.',
})
return
started = time.time()
while decoder.is_running and (time.time() - started) < DECODE_TIMEOUT_SECONDS:
time.sleep(1.0)
if decoder.is_running:
decoder.stop()
emit_event({
'type': 'weather_decode_failed',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'message': 'Meteor decode timed out.',
})
return
images = decoder.get_images()
if not images:
emit_event({
'type': 'weather_decode_failed',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'message': 'Decode completed but no image outputs were produced.',
})
return
outputs = []
for image in images:
metadata = {
'satellite': image.satellite,
'mode': image.mode,
'frequency': image.frequency,
'product': image.product,
'timestamp': image.timestamp.isoformat(),
'size_bytes': image.size_bytes,
}
output_id = register_output(
observation_id=obs_db_id,
norad_id=norad_id,
output_type='image',
backend='meteor_lrpt',
file_path=image.path,
preview_path=image.path,
metadata=metadata,
)
outputs.append({
'id': output_id,
'file_path': str(image.path),
'filename': image.filename,
'product': image.product,
})
emit_event({
'type': 'weather_decode_complete',
'norad_id': norad_id,
'satellite': satellite_name,
'backend': 'meteor_lrpt',
'outputs': outputs,
})
+77 -2
View File
@@ -7,6 +7,7 @@ to record raw IQ in SigMF format.
from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
@@ -16,6 +17,52 @@ from utils.logging import get_logger
logger = get_logger('intercept.ground_station.profile')
VALID_TASK_TYPES = {
'telemetry_ax25',
'telemetry_gmsk',
'telemetry_bpsk',
'weather_meteor_lrpt',
'record_iq',
}
def legacy_decoder_to_tasks(decoder_type: str | None, record_iq: bool = False) -> list[str]:
decoder = (decoder_type or 'fm').lower()
tasks: list[str] = []
if decoder in ('fm', 'afsk'):
tasks.append('telemetry_ax25')
elif decoder == 'gmsk':
tasks.append('telemetry_gmsk')
elif decoder == 'bpsk':
tasks.append('telemetry_bpsk')
elif decoder == 'iq_only':
tasks.append('record_iq')
if record_iq and 'record_iq' not in tasks:
tasks.append('record_iq')
return tasks
def tasks_to_legacy_decoder(tasks: list[str]) -> str:
normalized = normalize_tasks(tasks)
if 'telemetry_bpsk' in normalized:
return 'bpsk'
if 'telemetry_gmsk' in normalized:
return 'gmsk'
if 'telemetry_ax25' in normalized:
return 'afsk'
return 'iq_only'
def normalize_tasks(tasks: list[str] | None) -> list[str]:
result: list[str] = []
for task in tasks or []:
value = str(task or '').strip().lower()
if value and value in VALID_TASK_TYPES and value not in result:
result.append(value)
return result
@dataclass
class ObservationProfile:
"""Per-satellite capture configuration."""
@@ -30,29 +77,50 @@ class ObservationProfile:
enabled: bool = True
record_iq: bool = False
iq_sample_rate: int = 2_400_000
tasks: list[str] = field(default_factory=list)
id: int | None = None
created_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_dict(self) -> dict[str, Any]:
normalized_tasks = self.get_tasks()
return {
'id': self.id,
'norad_id': self.norad_id,
'name': self.name,
'frequency_mhz': self.frequency_mhz,
'decoder_type': self.decoder_type,
'legacy_decoder_type': self.decoder_type,
'gain': self.gain,
'bandwidth_hz': self.bandwidth_hz,
'min_elevation': self.min_elevation,
'enabled': self.enabled,
'record_iq': self.record_iq,
'iq_sample_rate': self.iq_sample_rate,
'tasks': normalized_tasks,
'created_at': self.created_at,
}
def get_tasks(self) -> list[str]:
tasks = normalize_tasks(self.tasks)
if not tasks:
tasks = legacy_decoder_to_tasks(self.decoder_type, self.record_iq)
if self.record_iq and 'record_iq' not in tasks:
tasks.append('record_iq')
if 'weather_meteor_lrpt' in tasks and 'record_iq' not in tasks:
tasks.append('record_iq')
return tasks
@classmethod
def from_row(cls, row) -> 'ObservationProfile':
tasks = []
raw_tasks = row['tasks_json'] if 'tasks_json' in row.keys() else None
if raw_tasks:
try:
tasks = normalize_tasks(json.loads(raw_tasks))
except (TypeError, ValueError, json.JSONDecodeError):
tasks = []
return cls(
id=row['id'],
norad_id=row['norad_id'],
@@ -65,6 +133,7 @@ class ObservationProfile:
enabled=bool(row['enabled']),
record_iq=bool(row['record_iq']),
iq_sample_rate=row['iq_sample_rate'],
tasks=tasks,
created_at=row['created_at'],
)
@@ -97,17 +166,22 @@ def get_profile(norad_id: int) -> ObservationProfile | None:
def save_profile(profile: ObservationProfile) -> ObservationProfile:
"""Insert or replace an observation profile. Returns the saved profile."""
from utils.database import get_db
normalized_tasks = profile.get_tasks()
profile.tasks = normalized_tasks
profile.record_iq = 'record_iq' in normalized_tasks
profile.decoder_type = tasks_to_legacy_decoder(normalized_tasks)
with get_db() as conn:
conn.execute('''
INSERT INTO observation_profiles
(norad_id, name, frequency_mhz, decoder_type, gain,
(norad_id, name, frequency_mhz, decoder_type, tasks_json, gain,
bandwidth_hz, min_elevation, enabled, record_iq,
iq_sample_rate, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(norad_id) DO UPDATE SET
name=excluded.name,
frequency_mhz=excluded.frequency_mhz,
decoder_type=excluded.decoder_type,
tasks_json=excluded.tasks_json,
gain=excluded.gain,
bandwidth_hz=excluded.bandwidth_hz,
min_elevation=excluded.min_elevation,
@@ -119,6 +193,7 @@ def save_profile(profile: ObservationProfile) -> ObservationProfile:
profile.name,
profile.frequency_mhz,
profile.decoder_type,
json.dumps(normalized_tasks),
profile.gain,
profile.bandwidth_hz,
profile.min_elevation,
+153 -20
View File
@@ -385,8 +385,8 @@ class GroundStationScheduler:
# Attach decoder consumers
self._attach_decoder_consumers(bus, profile, obs_db_id, obs)
# Attach SigMF consumer if requested
if profile.record_iq:
# Attach SigMF consumer when explicitly requested or required by tasks
if _profile_requires_iq_recording(profile):
self._attach_sigmf_consumer(bus, profile, obs_db_id)
# Start bus
@@ -473,12 +473,12 @@ class GroundStationScheduler:
# ------------------------------------------------------------------
def _attach_decoder_consumers(self, bus, profile, obs_db_id: int | None, obs) -> None:
"""Attach the appropriate decoder consumer based on profile.decoder_type."""
decoder_type = (profile.decoder_type or '').lower()
"""Attach consumers for all telemetry tasks on the profile."""
import shutil
if decoder_type in ('fm', 'afsk'):
# direwolf for AX.25 / AFSK
import shutil
tasks = _get_profile_tasks(profile)
if 'telemetry_ax25' in tasks:
if shutil.which('direwolf'):
from utils.ground_station.consumers.fm_demod import FMDemodConsumer
consumer = FMDemodConsumer(
@@ -486,41 +486,43 @@ class GroundStationScheduler:
'direwolf', '-r', '48000', '-n', '1', '-b', '16', '-',
],
modulation='fm',
on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs),
on_decoded=lambda line: self._on_packet_decoded(
line, obs_db_id, obs, source='direwolf'
),
)
bus.add_consumer(consumer)
logger.info("Ground station: attached direwolf AX.25 decoder")
else:
logger.warning("direwolf not found — AX.25 decoding disabled")
elif decoder_type == 'gmsk':
import shutil
if 'telemetry_gmsk' in tasks:
if shutil.which('multimon-ng'):
from utils.ground_station.consumers.fm_demod import FMDemodConsumer
consumer = FMDemodConsumer(
decoder_cmd=['multimon-ng', '-t', 'raw', '-a', 'GMSK', '-'],
modulation='fm',
on_decoded=lambda line: self._on_packet_decoded(line, obs_db_id, obs),
on_decoded=lambda line: self._on_packet_decoded(
line, obs_db_id, obs, source='multimon-ng'
),
)
bus.add_consumer(consumer)
logger.info("Ground station: attached multimon-ng GMSK decoder")
else:
logger.warning("multimon-ng not found — GMSK decoding disabled")
elif decoder_type == 'bpsk':
if 'telemetry_bpsk' in tasks:
from utils.ground_station.consumers.gr_satellites import GrSatConsumer
consumer = GrSatConsumer(
satellite_name=profile.name,
on_decoded=lambda pkt: self._on_packet_decoded(
json.dumps(pkt) if isinstance(pkt, dict) else str(pkt),
pkt,
obs_db_id,
obs,
source='gr_satellites',
),
)
bus.add_consumer(consumer)
# 'iq_only' → no decoder, just SigMF
def _attach_sigmf_consumer(self, bus, profile, obs_db_id: int | None) -> None:
"""Attach a SigMFConsumer for raw IQ recording."""
from utils.sigmf import SigMFMetadata
@@ -543,6 +545,27 @@ class GroundStationScheduler:
'data_path': str(data_path),
'meta_path': str(meta_path),
})
if 'weather_meteor_lrpt' in _get_profile_tasks(profile):
try:
from utils.ground_station.meteor_backend import launch_meteor_decode
launch_meteor_decode(
obs_db_id=obs_db_id,
norad_id=profile.norad_id,
satellite_name=profile.name,
sample_rate=profile.iq_sample_rate,
data_path=Path(data_path),
emit_event=self._emit_event,
register_output=_insert_output_record,
)
except Exception as e:
logger.warning(f"Failed to launch Meteor decode backend: {e}")
self._emit_event({
'type': 'weather_decode_failed',
'norad_id': profile.norad_id,
'satellite': profile.name,
'backend': 'meteor_lrpt',
'message': str(e),
})
consumer = SigMFConsumer(metadata=meta, on_complete=_on_recording_complete)
bus.add_consumer(consumer)
@@ -622,16 +645,25 @@ class GroundStationScheduler:
# Packet / event callbacks
# ------------------------------------------------------------------
def _on_packet_decoded(self, line: str, obs_db_id: int | None, obs: ScheduledObservation) -> None:
"""Handle a decoded packet line from a decoder consumer."""
if not line:
def _on_packet_decoded(
self,
payload,
obs_db_id: int | None,
obs: ScheduledObservation,
*,
source: str = 'decoder',
) -> None:
"""Handle a decoded packet payload from a decoder consumer."""
if payload is None or payload == '':
return
_insert_event_record(obs_db_id, 'packet', line)
packet_event = _build_packet_event(payload, source)
_insert_event_record(obs_db_id, 'packet', json.dumps(packet_event))
self._emit_event({
'type': 'packet_decoded',
'norad_id': obs.profile_norad_id,
'satellite': obs.satellite_name,
'data': line,
**packet_event,
})
def _emit_event(self, event: dict[str, Any]) -> None:
@@ -698,6 +730,68 @@ def _insert_event_record(obs_db_id: int | None, event_type: str, payload: str) -
logger.debug(f"Failed to insert event record: {e}")
def _get_profile_tasks(profile) -> list[str]:
get_tasks = getattr(profile, 'get_tasks', None)
if callable(get_tasks):
return get_tasks()
return []
def _profile_requires_iq_recording(profile) -> bool:
tasks = _get_profile_tasks(profile)
return bool(getattr(profile, 'record_iq', False) or 'record_iq' in tasks or 'weather_meteor_lrpt' in tasks)
def _build_packet_event(payload, source: str) -> dict[str, Any]:
event: dict[str, Any] = {
'source': source,
'data': payload if isinstance(payload, str) else json.dumps(payload),
'parsed': None,
}
if isinstance(payload, dict):
event['parsed'] = payload
event['protocol'] = payload.get('protocol') or payload.get('type') or source
return event
text = str(payload).strip()
event['data'] = text
parsed = None
if source == 'gr_satellites':
try:
candidate = json.loads(text)
if isinstance(candidate, dict):
parsed = candidate
except json.JSONDecodeError:
parsed = None
if parsed is None:
try:
from utils.satellite_telemetry import auto_parse
import base64
for token in text.replace(',', ' ').split():
cleaned = token.strip()
if not cleaned or len(cleaned) < 8:
continue
try:
raw = base64.b64decode(cleaned, validate=True)
except Exception:
continue
maybe = auto_parse(raw)
if maybe:
parsed = maybe
break
except Exception:
parsed = None
event['parsed'] = parsed
if isinstance(parsed, dict):
event['protocol'] = parsed.get('protocol') or source
return event
def _insert_recording_record(obs_db_id: int | None, meta_path: Path, data_path: Path, profile) -> None:
try:
from utils.database import get_db
@@ -722,6 +816,45 @@ def _insert_recording_record(obs_db_id: int | None, meta_path: Path, data_path:
logger.warning(f"Failed to insert recording record: {e}")
def _insert_output_record(
*,
observation_id: int | None,
norad_id: int | None,
output_type: str,
backend: str,
file_path: Path,
preview_path: Path | None = None,
metadata: dict[str, Any] | None = None,
) -> int | None:
try:
from utils.database import get_db
from datetime import datetime, timezone
with get_db() as conn:
cur = conn.execute(
'''
INSERT INTO ground_station_outputs
(observation_id, norad_id, output_type, backend, file_path,
preview_path, metadata_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''',
(
observation_id,
norad_id,
output_type,
backend,
str(file_path),
str(preview_path) if preview_path else None,
json.dumps(metadata or {}),
datetime.now(timezone.utc).isoformat(),
),
)
return cur.lastrowid
except Exception as e:
logger.warning(f"Failed to insert output record: {e}")
return None
# ---------------------------------------------------------------------------
# TLE lookup helpers
# ---------------------------------------------------------------------------
+17 -11
View File
@@ -32,7 +32,13 @@ from typing import Callable
from utils.logging import get_logger
from utils.process import register_process, safe_terminate
logger = get_logger('intercept.weather_sat')
logger = get_logger('intercept.weather_sat')
PROJECT_ROOT = Path(__file__).resolve().parent.parent
ALLOWED_OFFLINE_INPUT_DIRS = (
PROJECT_ROOT / 'data',
PROJECT_ROOT / 'instance' / 'ground_station' / 'recordings',
)
# Weather satellite definitions
@@ -277,16 +283,16 @@ class WeatherSatDecoder:
input_path = Path(input_file)
# Security: restrict to data directory
allowed_base = Path(__file__).resolve().parent.parent / 'data'
try:
resolved = input_path.resolve()
if not resolved.is_relative_to(allowed_base):
logger.warning(f"Path traversal blocked in start_from_file: {input_file}")
msg = 'Input file must be under the data/ directory'
self._emit_progress(CaptureProgress(
status='error',
message=msg,
# Security: restrict offline decode inputs to application-owned
# capture directories so external paths cannot be injected.
try:
resolved = input_path.resolve()
if not any(resolved.is_relative_to(base) for base in ALLOWED_OFFLINE_INPUT_DIRS):
logger.warning(f"Path traversal blocked in start_from_file: {input_file}")
msg = 'Input file must be under INTERCEPT data or ground-station recordings'
self._emit_progress(CaptureProgress(
status='error',
message=msg,
))
return False, msg
except (OSError, ValueError):