diff --git a/routes/system.py b/routes/system.py index ae0c365..68793b2 100644 --- a/routes/system.py +++ b/routes/system.py @@ -11,6 +11,7 @@ import contextlib import os import platform import queue +import shutil import socket import subprocess import threading @@ -38,7 +39,7 @@ try: except ImportError: _requests = None # type: ignore[assignment] -system_bp = Blueprint('system', __name__, url_prefix='/system') +system_bp = Blueprint("system", __name__, url_prefix="/system") # --------------------------------------------------------------------------- # Background metrics collector @@ -62,7 +63,7 @@ def _get_app_start_time() -> float: try: import app as app_module - _app_start_time = getattr(app_module, '_app_start_time', time.time()) + _app_start_time = getattr(app_module, "_app_start_time", time.time()) except Exception: _app_start_time = time.time() return _app_start_time @@ -75,7 +76,7 @@ def _get_app_version() -> str: return VERSION except Exception: - return 'unknown' + return "unknown" def _format_uptime(seconds: float) -> str: @@ -85,11 +86,11 @@ def _format_uptime(seconds: float) -> str: minutes = int((seconds % 3600) // 60) parts = [] if days > 0: - parts.append(f'{days}d') + parts.append(f"{days}d") if hours > 0: - parts.append(f'{hours}h') - parts.append(f'{minutes}m') - return ' '.join(parts) + parts.append(f"{hours}h") + parts.append(f"{minutes}m") + return " ".join(parts) def _collect_process_status() -> dict[str, bool]: @@ -110,15 +111,15 @@ def _collect_process_status() -> dict[str, bool]: return False processes: dict[str, bool] = { - 'pager': _alive('current_process'), - 'sensor': _alive('sensor_process'), - 'adsb': _alive('adsb_process'), - 'ais': _alive('ais_process'), - 'acars': _alive('acars_process'), - 'vdl2': _alive('vdl2_process'), - 'aprs': _alive('aprs_process'), - 'dsc': _alive('dsc_process'), - 'morse': _alive('morse_process'), + "pager": _alive("current_process"), + "sensor": _alive("sensor_process"), + "adsb": _alive("adsb_process"), + "ais": _alive("ais_process"), + "acars": _alive("acars_process"), + "vdl2": _alive("vdl2_process"), + "aprs": _alive("aprs_process"), + "dsc": _alive("dsc_process"), + "morse": _alive("morse_process"), } # WiFi @@ -126,26 +127,26 @@ def _collect_process_status() -> dict[str, bool]: from app import _get_wifi_health wifi_active, _, _ = _get_wifi_health() - processes['wifi'] = wifi_active + processes["wifi"] = wifi_active except Exception: - processes['wifi'] = False + processes["wifi"] = False # Bluetooth try: from app import _get_bluetooth_health bt_active, _ = _get_bluetooth_health() - processes['bluetooth'] = bt_active + processes["bluetooth"] = bt_active except Exception: - processes['bluetooth'] = False + processes["bluetooth"] = False # SubGHz try: from app import _get_subghz_active - processes['subghz'] = _get_subghz_active() + processes["subghz"] = _get_subghz_active() except Exception: - processes['subghz'] = False + processes["subghz"] = False return processes except Exception: @@ -154,15 +155,17 @@ def _collect_process_status() -> dict[str, bool]: def _collect_throttle_flags() -> str | None: """Read Raspberry Pi throttle flags via vcgencmd (Linux/Pi only).""" + if shutil.which("vcgencmd") is None: + return None try: result = subprocess.run( - ['vcgencmd', 'get_throttled'], + ["vcgencmd", "get_throttled"], capture_output=True, text=True, timeout=2, ) - if result.returncode == 0 and 'throttled=' in result.stdout: - return result.stdout.strip().split('=', 1)[1] + if result.returncode == 0 and "throttled=" in result.stdout: + return result.stdout.strip().split("=", 1)[1] except Exception: pass return None @@ -171,11 +174,11 @@ def _collect_throttle_flags() -> str | None: def _collect_power_draw() -> float | None: """Read power draw in watts from sysfs (Linux only).""" try: - power_supply = Path('/sys/class/power_supply') + power_supply = Path("/sys/class/power_supply") if not power_supply.exists(): return None for supply_dir in power_supply.iterdir(): - power_file = supply_dir / 'power_now' + power_file = supply_dir / "power_now" if power_file.exists(): val = int(power_file.read_text().strip()) return round(val / 1_000_000, 2) # microwatts to watts @@ -191,17 +194,17 @@ def _collect_metrics() -> dict[str, Any]: uptime_seconds = round(now - start, 2) metrics: dict[str, Any] = { - 'type': 'system_metrics', - 'timestamp': now, - 'system': { - 'hostname': socket.gethostname(), - 'platform': platform.platform(), - 'python': platform.python_version(), - 'version': _get_app_version(), - 'uptime_seconds': uptime_seconds, - 'uptime_human': _format_uptime(uptime_seconds), + "type": "system_metrics", + "timestamp": now, + "system": { + "hostname": socket.gethostname(), + "platform": platform.platform(), + "python": platform.python_version(), + "version": _get_app_version(), + "uptime_seconds": uptime_seconds, + "uptime_human": _format_uptime(uptime_seconds), }, - 'processes': _collect_process_status(), + "processes": _collect_process_status(), } if _HAS_PSUTIL: @@ -222,61 +225,61 @@ def _collect_metrics() -> dict[str, Any]: freq = psutil.cpu_freq() if freq: freq_data = { - 'current': round(freq.current, 0), - 'min': round(freq.min, 0), - 'max': round(freq.max, 0), + "current": round(freq.current, 0), + "min": round(freq.min, 0), + "max": round(freq.max, 0), } - metrics['cpu'] = { - 'percent': cpu_percent, - 'count': cpu_count, - 'load_1': round(load_1, 2), - 'load_5': round(load_5, 2), - 'load_15': round(load_15, 2), - 'per_core': per_core, - 'freq': freq_data, + metrics["cpu"] = { + "percent": cpu_percent, + "count": cpu_count, + "load_1": round(load_1, 2), + "load_5": round(load_5, 2), + "load_15": round(load_15, 2), + "per_core": per_core, + "freq": freq_data, } # Memory mem = psutil.virtual_memory() - metrics['memory'] = { - 'total': mem.total, - 'used': mem.used, - 'available': mem.available, - 'percent': mem.percent, + metrics["memory"] = { + "total": mem.total, + "used": mem.used, + "available": mem.available, + "percent": mem.percent, } swap = psutil.swap_memory() - metrics['swap'] = { - 'total': swap.total, - 'used': swap.used, - 'percent': swap.percent, + metrics["swap"] = { + "total": swap.total, + "used": swap.used, + "percent": swap.percent, } # Disk — usage + I/O counters try: - disk = psutil.disk_usage('/') - metrics['disk'] = { - 'total': disk.total, - 'used': disk.used, - 'free': disk.free, - 'percent': disk.percent, - 'path': '/', + disk = psutil.disk_usage("/") + metrics["disk"] = { + "total": disk.total, + "used": disk.used, + "free": disk.free, + "percent": disk.percent, + "path": "/", } except Exception: - metrics['disk'] = None + metrics["disk"] = None disk_io = None with contextlib.suppress(Exception): dio = psutil.disk_io_counters() if dio: disk_io = { - 'read_bytes': dio.read_bytes, - 'write_bytes': dio.write_bytes, - 'read_count': dio.read_count, - 'write_count': dio.write_count, + "read_bytes": dio.read_bytes, + "write_bytes": dio.write_bytes, + "read_count": dio.read_count, + "write_count": dio.write_count, } - metrics['disk_io'] = disk_io + metrics["disk_io"] = disk_io # Temperatures try: @@ -286,18 +289,18 @@ def _collect_metrics() -> dict[str, Any]: for chip, entries in temps.items(): temp_data[chip] = [ { - 'label': e.label or chip, - 'current': e.current, - 'high': e.high, - 'critical': e.critical, + "label": e.label or chip, + "current": e.current, + "high": e.high, + "critical": e.critical, } for e in entries ] - metrics['temperatures'] = temp_data + metrics["temperatures"] = temp_data else: - metrics['temperatures'] = None + metrics["temperatures"] = None except (AttributeError, Exception): - metrics['temperatures'] = None + metrics["temperatures"] = None # Fans fans_data = None @@ -306,11 +309,8 @@ def _collect_metrics() -> dict[str, Any]: if fans: fans_data = {} for chip, entries in fans.items(): - fans_data[chip] = [ - {'label': e.label or chip, 'current': e.current} - for e in entries - ] - metrics['fans'] = fans_data + fans_data[chip] = [{"label": e.label or chip, "current": e.current} for e in entries] + metrics["fans"] = fans_data # Battery battery_data = None @@ -318,11 +318,11 @@ def _collect_metrics() -> dict[str, Any]: bat = psutil.sensors_battery() if bat: battery_data = { - 'percent': bat.percent, - 'plugged': bat.power_plugged, - 'secs_left': bat.secsleft if bat.secsleft != psutil.POWER_TIME_UNLIMITED else None, + "percent": bat.percent, + "plugged": bat.power_plugged, + "secs_left": bat.secsleft if bat.secsleft != psutil.POWER_TIME_UNLIMITED else None, } - metrics['battery'] = battery_data + metrics["battery"] = battery_data # Network interfaces net_ifaces: list[dict[str, Any]] = [] @@ -330,25 +330,25 @@ def _collect_metrics() -> dict[str, Any]: addrs = psutil.net_if_addrs() stats = psutil.net_if_stats() for iface_name in sorted(addrs.keys()): - if iface_name == 'lo': + if iface_name == "lo": continue - iface_info: dict[str, Any] = {'name': iface_name} + iface_info: dict[str, Any] = {"name": iface_name} # Get addresses for addr in addrs[iface_name]: if addr.family == socket.AF_INET: - iface_info['ipv4'] = addr.address + iface_info["ipv4"] = addr.address elif addr.family == socket.AF_INET6: - iface_info.setdefault('ipv6', addr.address) + iface_info.setdefault("ipv6", addr.address) elif addr.family == psutil.AF_LINK: - iface_info['mac'] = addr.address + iface_info["mac"] = addr.address # Get stats if iface_name in stats: st = stats[iface_name] - iface_info['is_up'] = st.isup - iface_info['speed'] = st.speed # Mbps - iface_info['mtu'] = st.mtu + iface_info["is_up"] = st.isup + iface_info["speed"] = st.speed # Mbps + iface_info["mtu"] = st.mtu net_ifaces.append(iface_info) - metrics['network'] = {'interfaces': net_ifaces} + metrics["network"] = {"interfaces": net_ifaces} # Network I/O counters (raw — JS computes deltas) net_io = None @@ -357,43 +357,43 @@ def _collect_metrics() -> dict[str, Any]: if counters: net_io = {} for nic, c in counters.items(): - if nic == 'lo': + if nic == "lo": continue net_io[nic] = { - 'bytes_sent': c.bytes_sent, - 'bytes_recv': c.bytes_recv, + "bytes_sent": c.bytes_sent, + "bytes_recv": c.bytes_recv, } - metrics['network']['io'] = net_io + metrics["network"]["io"] = net_io # Connection count conn_count = 0 with contextlib.suppress(Exception): conn_count = len(psutil.net_connections()) - metrics['network']['connections'] = conn_count + metrics["network"]["connections"] = conn_count # Boot time boot_ts = None with contextlib.suppress(Exception): boot_ts = psutil.boot_time() - metrics['boot_time'] = boot_ts + metrics["boot_time"] = boot_ts # Power / throttle (Pi-specific) - metrics['power'] = { - 'throttled': _collect_throttle_flags(), - 'draw_watts': _collect_power_draw(), + metrics["power"] = { + "throttled": _collect_throttle_flags(), + "draw_watts": _collect_power_draw(), } else: - metrics['cpu'] = None - metrics['memory'] = None - metrics['swap'] = None - metrics['disk'] = None - metrics['disk_io'] = None - metrics['temperatures'] = None - metrics['fans'] = None - metrics['battery'] = None - metrics['network'] = None - metrics['boot_time'] = None - metrics['power'] = None + metrics["cpu"] = None + metrics["memory"] = None + metrics["swap"] = None + metrics["disk"] = None + metrics["disk_io"] = None + metrics["temperatures"] = None + metrics["fans"] = None + metrics["battery"] = None + metrics["network"] = None + metrics["boot_time"] = None + metrics["power"] = None return metrics @@ -416,7 +416,7 @@ def _collector_loop() -> None: _metrics_queue.get_nowait() _metrics_queue.put_nowait(metrics) except Exception as exc: - logger.debug('system metrics collection error: %s', exc) + logger.debug("system metrics collection error: %s", exc) time.sleep(3) @@ -428,15 +428,15 @@ def _ensure_collector() -> None: with _collector_lock: if _collector_started: return - t = threading.Thread(target=_collector_loop, daemon=True, name='system-metrics-collector') + t = threading.Thread(target=_collector_loop, daemon=True, name="system-metrics-collector") t.start() _collector_started = True - logger.info('System metrics collector started') + logger.info("System metrics collector started") def _get_observer_location() -> dict[str, Any]: """Get observer location from GPS state or config defaults.""" - lat, lon, source = None, None, 'none' + lat, lon, source = None, None, "none" gps_meta: dict[str, Any] = {} # Try GPS via utils.gps @@ -445,13 +445,13 @@ def _get_observer_location() -> dict[str, Any]: pos = get_current_position() if pos and pos.fix_quality >= 2: - lat, lon, source = pos.latitude, pos.longitude, 'gps' - gps_meta['fix_quality'] = pos.fix_quality - gps_meta['satellites'] = pos.satellites + lat, lon, source = pos.latitude, pos.longitude, "gps" + gps_meta["fix_quality"] = pos.fix_quality + gps_meta["satellites"] = pos.satellites if pos.epx is not None and pos.epy is not None: - gps_meta['accuracy'] = round(max(pos.epx, pos.epy), 1) + gps_meta["accuracy"] = round(max(pos.epx, pos.epy), 1) if pos.altitude is not None: - gps_meta['altitude'] = round(pos.altitude, 1) + gps_meta["altitude"] = round(pos.altitude, 1) # Fall back to config env vars if lat is None: @@ -459,7 +459,7 @@ def _get_observer_location() -> dict[str, Any]: from config import DEFAULT_LATITUDE, DEFAULT_LONGITUDE if DEFAULT_LATITUDE != 0.0 or DEFAULT_LONGITUDE != 0.0: - lat, lon, source = DEFAULT_LATITUDE, DEFAULT_LONGITUDE, 'config' + lat, lon, source = DEFAULT_LATITUDE, DEFAULT_LONGITUDE, "config" # Fall back to hardcoded constants (London) if lat is None: @@ -467,11 +467,11 @@ def _get_observer_location() -> dict[str, Any]: from utils.constants import DEFAULT_LATITUDE as CONST_LAT from utils.constants import DEFAULT_LONGITUDE as CONST_LON - lat, lon, source = CONST_LAT, CONST_LON, 'default' + lat, lon, source = CONST_LAT, CONST_LON, "default" - result: dict[str, Any] = {'lat': lat, 'lon': lon, 'source': source} + result: dict[str, Any] = {"lat": lat, "lon": lon, "source": source} if gps_meta: - result['gps'] = gps_meta + result["gps"] = gps_meta return result @@ -480,14 +480,14 @@ def _get_observer_location() -> dict[str, Any]: # --------------------------------------------------------------------------- -@system_bp.route('/metrics') +@system_bp.route("/metrics") def get_metrics() -> Response: """REST snapshot of current system metrics.""" _ensure_collector() return jsonify(_collect_metrics()) -@system_bp.route('/stream') +@system_bp.route("/stream") def stream_system() -> Response: """SSE stream for real-time system metrics.""" _ensure_collector() @@ -495,18 +495,18 @@ def stream_system() -> Response: response = Response( sse_stream_fanout( source_queue=_metrics_queue, - channel_key='system', + channel_key="system", timeout=SSE_QUEUE_TIMEOUT, keepalive_interval=SSE_KEEPALIVE_INTERVAL, ), - mimetype='text/event-stream', + mimetype="text/event-stream", ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' + response.headers["Cache-Control"] = "no-cache" + response.headers["X-Accel-Buffering"] = "no" return response -@system_bp.route('/sdr_devices') +@system_bp.route("/sdr_devices") def get_sdr_devices() -> Response: """Enumerate all connected SDR devices (on-demand, not every tick).""" try: @@ -515,26 +515,28 @@ def get_sdr_devices() -> Response: devices = detect_all_devices() result = [] for d in devices: - result.append({ - 'type': d.sdr_type.value if hasattr(d.sdr_type, 'value') else str(d.sdr_type), - 'index': d.index, - 'name': d.name, - 'serial': d.serial or '', - 'driver': d.driver or '', - }) - return jsonify({'devices': result}) + result.append( + { + "type": d.sdr_type.value if hasattr(d.sdr_type, "value") else str(d.sdr_type), + "index": d.index, + "name": d.name, + "serial": d.serial or "", + "driver": d.driver or "", + } + ) + return jsonify({"devices": result}) except Exception as exc: - logger.warning('SDR device detection failed: %s', exc) - return jsonify({'devices': [], 'error': str(exc)}) + logger.warning("SDR device detection failed: %s", exc) + return jsonify({"devices": [], "error": str(exc)}) -@system_bp.route('/location') +@system_bp.route("/location") def get_location() -> Response: """Return observer location from GPS or config.""" return jsonify(_get_observer_location()) -@system_bp.route('/weather') +@system_bp.route("/weather") def get_weather() -> Response: """Proxy weather from wttr.in, cached for 10 minutes.""" global _weather_cache, _weather_cache_time @@ -543,42 +545,42 @@ def get_weather() -> Response: if _weather_cache and (now - _weather_cache_time) < _WEATHER_CACHE_TTL: return jsonify(_weather_cache) - lat = request.args.get('lat', type=float) - lon = request.args.get('lon', type=float) + lat = request.args.get("lat", type=float) + lon = request.args.get("lon", type=float) if lat is None or lon is None: loc = _get_observer_location() - lat, lon = loc.get('lat'), loc.get('lon') + lat, lon = loc.get("lat"), loc.get("lon") if lat is None or lon is None: - return api_error('No location available') + return api_error("No location available") if _requests is None: - return api_error('requests library not available') + return api_error("requests library not available") try: resp = _requests.get( - f'https://wttr.in/{lat},{lon}?format=j1', + f"https://wttr.in/{lat},{lon}?format=j1", timeout=5, - headers={'User-Agent': 'INTERCEPT-SystemHealth/1.0'}, + headers={"User-Agent": "INTERCEPT-SystemHealth/1.0"}, ) resp.raise_for_status() data = resp.json() - current = data.get('current_condition', [{}])[0] + current = data.get("current_condition", [{}])[0] weather = { - 'temp_c': current.get('temp_C'), - 'temp_f': current.get('temp_F'), - 'condition': current.get('weatherDesc', [{}])[0].get('value', ''), - 'humidity': current.get('humidity'), - 'wind_mph': current.get('windspeedMiles'), - 'wind_dir': current.get('winddir16Point'), - 'feels_like_c': current.get('FeelsLikeC'), - 'visibility': current.get('visibility'), - 'pressure': current.get('pressure'), + "temp_c": current.get("temp_C"), + "temp_f": current.get("temp_F"), + "condition": current.get("weatherDesc", [{}])[0].get("value", ""), + "humidity": current.get("humidity"), + "wind_mph": current.get("windspeedMiles"), + "wind_dir": current.get("winddir16Point"), + "feels_like_c": current.get("FeelsLikeC"), + "visibility": current.get("visibility"), + "pressure": current.get("pressure"), } _weather_cache = weather _weather_cache_time = now return jsonify(weather) except Exception as exc: - logger.debug('Weather fetch failed: %s', exc) + logger.debug("Weather fetch failed: %s", exc) return api_error(str(exc)) diff --git a/tests/test_system.py b/tests/test_system.py index bd5fdac..70dff99 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -8,52 +8,66 @@ from unittest.mock import MagicMock, patch def _login(client): """Mark the Flask test session as authenticated.""" with client.session_transaction() as sess: - sess['logged_in'] = True - sess['username'] = 'test' - sess['role'] = 'admin' + sess["logged_in"] = True + sess["username"] = "test" + sess["role"] = "admin" def test_metrics_returns_expected_keys(client): """GET /system/metrics returns top-level metric keys.""" _login(client) - resp = client.get('/system/metrics') + resp = client.get("/system/metrics") assert resp.status_code == 200 data = resp.get_json() - assert 'system' in data - assert 'processes' in data - assert 'cpu' in data - assert 'memory' in data - assert 'disk' in data - assert data['system']['hostname'] - assert 'version' in data['system'] - assert 'uptime_seconds' in data['system'] - assert 'uptime_human' in data['system'] + assert "system" in data + assert "processes" in data + assert "cpu" in data + assert "memory" in data + assert "disk" in data + assert data["system"]["hostname"] + assert "version" in data["system"] + assert "uptime_seconds" in data["system"] + assert "uptime_human" in data["system"] def test_metrics_enhanced_keys(client): """GET /system/metrics returns enhanced metric keys.""" _login(client) - resp = client.get('/system/metrics') + resp = client.get("/system/metrics") assert resp.status_code == 200 data = resp.get_json() # New enhanced keys - assert 'network' in data - assert 'disk_io' in data - assert 'boot_time' in data - assert 'battery' in data - assert 'fans' in data - assert 'power' in data + assert "network" in data + assert "disk_io" in data + assert "boot_time" in data + assert "battery" in data + assert "fans" in data + assert "power" in data # CPU should have per_core and freq - if data['cpu'] is not None: - assert 'per_core' in data['cpu'] - assert 'freq' in data['cpu'] + if data["cpu"] is not None: + assert "per_core" in data["cpu"] + assert "freq" in data["cpu"] # Network should have interfaces and connections - if data['network'] is not None: - assert 'interfaces' in data['network'] - assert 'connections' in data['network'] - assert 'io' in data['network'] + if data["network"] is not None: + assert "interfaces" in data["network"] + assert "connections" in data["network"] + assert "io" in data["network"] + + +def test_throttle_flags_no_subprocess_without_vcgencmd(): + """No subprocess is spawned when vcgencmd is not on PATH (non-Pi hosts). + + The metrics collector thread runs for the whole process lifetime; if it + spawns subprocesses on hosts without vcgencmd, those calls leak into + other tests' subprocess mocks. + """ + import routes.system as mod + + with patch("routes.system.shutil.which", return_value=None), patch("routes.system.subprocess.run") as mock_run: + assert mod._collect_throttle_flags() is None + mock_run.assert_not_called() def test_metrics_without_psutil(client): @@ -64,18 +78,18 @@ def test_metrics_without_psutil(client): orig = mod._HAS_PSUTIL mod._HAS_PSUTIL = False try: - resp = client.get('/system/metrics') + resp = client.get("/system/metrics") assert resp.status_code == 200 data = resp.get_json() # These fields should be None without psutil - assert data['cpu'] is None - assert data['memory'] is None - assert data['disk'] is None - assert data['network'] is None - assert data['disk_io'] is None - assert data['battery'] is None - assert data['boot_time'] is None - assert data['power'] is None + assert data["cpu"] is None + assert data["memory"] is None + assert data["disk"] is None + assert data["network"] is None + assert data["disk_io"] is None + assert data["battery"] is None + assert data["boot_time"] is None + assert data["power"] is None finally: mod._HAS_PSUTIL = orig @@ -85,50 +99,50 @@ def test_sdr_devices_returns_list(client): _login(client) mock_device = MagicMock() mock_device.sdr_type = MagicMock() - mock_device.sdr_type.value = 'rtlsdr' + mock_device.sdr_type.value = "rtlsdr" mock_device.index = 0 - mock_device.name = 'Generic RTL2832U' - mock_device.serial = '00000001' - mock_device.driver = 'rtlsdr' + mock_device.name = "Generic RTL2832U" + mock_device.serial = "00000001" + mock_device.driver = "rtlsdr" - with patch('utils.sdr.detection.detect_all_devices', return_value=[mock_device]): - resp = client.get('/system/sdr_devices') + with patch("utils.sdr.detection.detect_all_devices", return_value=[mock_device]): + resp = client.get("/system/sdr_devices") assert resp.status_code == 200 data = resp.get_json() - assert 'devices' in data - assert len(data['devices']) == 1 - assert data['devices'][0]['type'] == 'rtlsdr' - assert data['devices'][0]['name'] == 'Generic RTL2832U' + assert "devices" in data + assert len(data["devices"]) == 1 + assert data["devices"][0]["type"] == "rtlsdr" + assert data["devices"][0]["name"] == "Generic RTL2832U" def test_sdr_devices_handles_detection_failure(client): """SDR detection failure returns empty list with error.""" _login(client) - with patch('utils.sdr.detection.detect_all_devices', side_effect=RuntimeError('no devices')): - resp = client.get('/system/sdr_devices') + with patch("utils.sdr.detection.detect_all_devices", side_effect=RuntimeError("no devices")): + resp = client.get("/system/sdr_devices") assert resp.status_code == 200 data = resp.get_json() - assert data['devices'] == [] - assert 'error' in data + assert data["devices"] == [] + assert "error" in data def test_stream_returns_sse_content_type(client): """GET /system/stream returns text/event-stream.""" _login(client) - resp = client.get('/system/stream') + resp = client.get("/system/stream") assert resp.status_code == 200 - assert 'text/event-stream' in resp.content_type + assert "text/event-stream" in resp.content_type def test_location_returns_shape(client): """GET /system/location returns lat/lon/source shape.""" _login(client) - resp = client.get('/system/location') + resp = client.get("/system/location") assert resp.status_code == 200 data = resp.get_json() - assert 'lat' in data - assert 'lon' in data - assert 'source' in data + assert "lat" in data + assert "lon" in data + assert "source" in data def test_location_from_gps(client): @@ -143,54 +157,55 @@ def test_location_from_gps(client): mock_pos.epy = 3.1 mock_pos.altitude = 45.0 - with patch('routes.system.get_current_position', return_value=mock_pos, create=True): + with patch("routes.system.get_current_position", return_value=mock_pos, create=True): # Patch the import inside the function import routes.system as mod + original = mod._get_observer_location def _patched(): - with patch('utils.gps.get_current_position', return_value=mock_pos): + with patch("utils.gps.get_current_position", return_value=mock_pos): return original() mod._get_observer_location = _patched try: - resp = client.get('/system/location') + resp = client.get("/system/location") finally: mod._get_observer_location = original assert resp.status_code == 200 data = resp.get_json() - assert data['source'] == 'gps' - assert data['lat'] == 51.5074 - assert data['lon'] == -0.1278 - assert data['gps']['fix_quality'] == 3 - assert data['gps']['satellites'] == 12 - assert data['gps']['accuracy'] == 3.1 - assert data['gps']['altitude'] == 45.0 + assert data["source"] == "gps" + assert data["lat"] == 51.5074 + assert data["lon"] == -0.1278 + assert data["gps"]["fix_quality"] == 3 + assert data["gps"]["satellites"] == 12 + assert data["gps"]["accuracy"] == 3.1 + assert data["gps"]["altitude"] == 45.0 def test_location_falls_back_to_defaults(client): """Location endpoint returns constants defaults when GPS and config unavailable.""" _login(client) - resp = client.get('/system/location') + resp = client.get("/system/location") assert resp.status_code == 200 data = resp.get_json() - assert 'source' in data + assert "source" in data # Should get location from config or default constants - assert data['lat'] is not None - assert data['lon'] is not None - assert data['source'] in ('config', 'default') + assert data["lat"] is not None + assert data["lon"] is not None + assert data["source"] in ("config", "default") def test_weather_requires_location(client): """Weather endpoint returns error when no location available.""" _login(client) # Without lat/lon params and no GPS state or config - resp = client.get('/system/weather') + resp = client.get("/system/weather") assert resp.status_code == 200 data = resp.get_json() # Either returns weather or error (depending on config) - assert 'error' in data or 'temp_c' in data + assert "error" in data or "temp_c" in data def test_weather_with_mocked_response(client): @@ -199,32 +214,35 @@ def test_weather_with_mocked_response(client): mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = { - 'current_condition': [{ - 'temp_C': '22', - 'temp_F': '72', - 'weatherDesc': [{'value': 'Clear'}], - 'humidity': '45', - 'windspeedMiles': '8', - 'winddir16Point': 'NW', - 'FeelsLikeC': '20', - 'visibility': '10', - 'pressure': '1013', - }] + "current_condition": [ + { + "temp_C": "22", + "temp_F": "72", + "weatherDesc": [{"value": "Clear"}], + "humidity": "45", + "windspeedMiles": "8", + "winddir16Point": "NW", + "FeelsLikeC": "20", + "visibility": "10", + "pressure": "1013", + } + ] } mock_resp.raise_for_status = MagicMock() import routes.system as mod + # Clear cache mod._weather_cache.clear() mod._weather_cache_time = 0.0 - with patch('routes.system._requests') as mock_requests: + with patch("routes.system._requests") as mock_requests: mock_requests.get.return_value = mock_resp - resp = client.get('/system/weather?lat=40.7&lon=-74.0') + resp = client.get("/system/weather?lat=40.7&lon=-74.0") assert resp.status_code == 200 data = resp.get_json() - assert data['temp_c'] == '22' - assert data['condition'] == 'Clear' - assert data['humidity'] == '45' - assert data['wind_mph'] == '8' + assert data["temp_c"] == "22" + assert data["condition"] == "Clear" + assert data["humidity"] == "45" + assert data["wind_mph"] == "8"