fix: harden TLE store for cross-process use

- busy_timeout so concurrent app+agent writers wait instead of raising
- seed from _connect() so update-before-first-read can't drop the seed
- regression tests: seed ordering, concurrent writer, default DB path

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-06-11 17:07:30 +01:00
parent f4a9cb7da6
commit 74d5663f73
2 changed files with 50 additions and 2 deletions
+44
View File
@@ -1,5 +1,7 @@
"""Tests for the unified TLE store."""
from pathlib import Path
import pytest
from utils import tle_store
@@ -46,3 +48,45 @@ class TestTLEStore:
tle_store.update({"TEST-SAT": SAMPLE})
tle_store._reset_for_tests()
assert tle_store.get_tle("TEST-SAT") == SAMPLE
def test_update_before_first_read_keeps_seed(self):
"""An update() on a fresh DB must not prevent the static seed."""
tle_store.update({"TEST-SAT": SAMPLE})
tles = tle_store.all_tles()
assert "TEST-SAT" in tles
assert "ISS" in tles # static seed still present
def test_update_waits_for_concurrent_writer(self):
"""A short-lived writer in another connection must not make update() raise."""
import sqlite3
import threading
tle_store.all_tles() # ensure DB exists
blocker = sqlite3.connect(str(tle_store._DB_PATH), check_same_thread=False)
blocker.execute("BEGIN IMMEDIATE")
def release_soon():
import time
time.sleep(0.2)
blocker.commit()
blocker.close()
t = threading.Thread(target=release_soon)
t.start()
tle_store.update({"TEST-SAT": SAMPLE}) # must block briefly, not raise
t.join()
assert tle_store.get_tle("TEST-SAT") == SAMPLE
def test_default_db_path_points_at_instance_dir():
"""The unpatched module constant must resolve to <repo>/instance/tle.db."""
import importlib
spec = importlib.util.find_spec("utils.tle_store")
module_file = Path(spec.origin)
expected = module_file.parent.parent / "instance" / "tle.db"
# Read the constant from a fresh module instance, not the patched one
fresh = importlib.util.module_from_spec(spec)
spec.loader.exec_module(fresh)
assert expected == fresh._DB_PATH
+6 -2
View File
@@ -24,8 +24,9 @@ _cache: dict[str, tuple[str, str, str]] | None = None
def _connect() -> sqlite3.Connection:
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(_DB_PATH), check_same_thread=False)
conn = sqlite3.connect(str(_DB_PATH), check_same_thread=False, timeout=5)
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA busy_timeout = 5000")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS tle_entries (
@@ -37,6 +38,7 @@ def _connect() -> sqlite3.Connection:
)
"""
)
_seed_if_empty(conn)
return conn
@@ -64,8 +66,10 @@ def _load() -> dict[str, tuple[str, str, str]]:
if _cache is None:
conn = _connect()
try:
_seed_if_empty(conn)
rows = conn.execute("SELECT key, name, line1, line2 FROM tle_entries").fetchall()
# Single-statement assignment of the fully built dict —
# the DCL pattern is only safe because readers never see
# a partially populated cache
_cache = {key: (name, l1, l2) for key, name, l1, l2 in rows}
finally:
conn.close()