feat: add unified SQLite-backed TLE store

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-06-11 16:58:09 +01:00
parent 2f6afd5e28
commit f4a9cb7da6
2 changed files with 159 additions and 0 deletions
+48
View File
@@ -0,0 +1,48 @@
"""Tests for the unified TLE store."""
import pytest
from utils import tle_store
@pytest.fixture(autouse=True)
def _fresh_db(tmp_path, monkeypatch):
"""Point the store at a throwaway database file."""
monkeypatch.setattr(tle_store, "_DB_PATH", tmp_path / "tle.db")
tle_store._reset_for_tests()
SAMPLE = (
"ISS (ZARYA)",
"1 25544U 98067A 23321.52083333 .00016717 00000-0 30171-3 0 9992",
"2 25544 51.6416 20.4567 0004561 45.3212 67.8912 15.49876543123456",
)
class TestTLEStore:
def test_seed_from_static_data(self):
"""First access seeds from data/satellites.py TLE_SATELLITES."""
tles = tle_store.all_tles()
assert "ISS" in tles
name, l1, l2 = tles["ISS"]
assert l1.startswith("1 ")
assert l2.startswith("2 ")
def test_update_and_get(self):
tle_store.update({"TEST-SAT": SAMPLE})
assert tle_store.get_tle("TEST-SAT") == SAMPLE
def test_get_missing_returns_none(self):
assert tle_store.get_tle("NO-SUCH-SAT") is None
def test_update_overwrites(self):
tle_store.update({"TEST-SAT": SAMPLE})
newer = (SAMPLE[0], SAMPLE[1].replace("23321", "26100"), SAMPLE[2])
tle_store.update({"TEST-SAT": newer})
assert tle_store.get_tle("TEST-SAT") == newer
def test_persists_across_reset(self):
"""Data survives a cache reset (i.e., it actually hit the database)."""
tle_store.update({"TEST-SAT": SAMPLE})
tle_store._reset_for_tests()
assert tle_store.get_tle("TEST-SAT") == SAMPLE
+111
View File
@@ -0,0 +1,111 @@
"""Unified TLE store.
Single source of truth for TLE data, shared by satellite tracking,
weather-satellite prediction, SSTV doppler, and the remote agent.
Backed by SQLite; seeded once from the static data/satellites.py.
Replaces three previous stores: routes/satellite._tle_cache (which
persisted by rewriting data/satellites.py at runtime),
utils/weather_sat_predict._tle_cache, and the agent's own download.
"""
import sqlite3
import threading
from pathlib import Path
from utils.logging import get_logger
logger = get_logger("intercept.tle_store")
_DB_PATH = Path(__file__).resolve().parent.parent / "instance" / "tle.db"
_lock = threading.Lock()
_cache: dict[str, tuple[str, str, str]] | None = None
def _connect() -> sqlite3.Connection:
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(_DB_PATH), check_same_thread=False)
conn.execute("PRAGMA journal_mode = WAL")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS tle_entries (
key TEXT PRIMARY KEY,
name TEXT NOT NULL,
line1 TEXT NOT NULL,
line2 TEXT NOT NULL,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
return conn
def _seed_if_empty(conn: sqlite3.Connection) -> None:
count = conn.execute("SELECT COUNT(*) FROM tle_entries").fetchone()[0]
if count:
return
try:
from data.satellites import TLE_SATELLITES
except ImportError:
logger.warning("data/satellites.py unavailable; TLE store starts empty")
return
conn.executemany(
"INSERT OR REPLACE INTO tle_entries (key, name, line1, line2) VALUES (?, ?, ?, ?)",
[(key, name, l1, l2) for key, (name, l1, l2) in TLE_SATELLITES.items()],
)
conn.commit()
logger.info(f"Seeded TLE store with {len(TLE_SATELLITES)} entries")
def _load() -> dict[str, tuple[str, str, str]]:
global _cache
if _cache is None:
with _lock:
if _cache is None:
conn = _connect()
try:
_seed_if_empty(conn)
rows = conn.execute("SELECT key, name, line1, line2 FROM tle_entries").fetchall()
_cache = {key: (name, l1, l2) for key, name, l1, l2 in rows}
finally:
conn.close()
return _cache
def all_tles() -> dict[str, tuple[str, str, str]]:
"""Return all TLEs as {key: (name, line1, line2)}."""
return dict(_load())
def get_tle(key: str) -> tuple[str, str, str] | None:
"""Return (name, line1, line2) for a satellite key, or None."""
return _load().get(key)
def update(entries: dict[str, tuple[str, str, str]]) -> None:
"""Insert or replace TLE entries and refresh the cache."""
if not entries:
return
with _lock:
conn = _connect()
try:
conn.executemany(
"INSERT OR REPLACE INTO tle_entries (key, name, line1, line2, updated_at)"
" VALUES (?, ?, ?, ?, datetime('now'))",
[(key, name, l1, l2) for key, (name, l1, l2) in entries.items()],
)
conn.commit()
finally:
conn.close()
global _cache
_cache = None # rebuilt on next read
def _reset_for_tests() -> None:
"""Clear the in-memory cache so the next read hits the database.
Seeding only runs when the table is empty, so a reset never overwrites
entries written by a test.
"""
global _cache
_cache = None