diff --git a/pyproject.toml b/pyproject.toml index 97c476c..c5535ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,13 +27,16 @@ classifiers = [ ] dependencies = [ "flask>=3.0.0", + "flask-wtf>=1.2.0", + "flask-compress>=1.15", + "flask-limiter>=2.5.4", + "flask-sock", + "simple-websocket>=0.5.1", + "websocket-client>=1.6.0", "skyfield>=1.45", "pyserial>=3.5", "Werkzeug>=3.1.5", - "flask-limiter>=2.5.4", "bleak>=0.21.0", - "flask-sock", - "websocket-client>=1.6.0", "requests>=2.28.0", ] @@ -51,6 +54,7 @@ dev = [ "black>=23.0.0", "mypy>=1.0.0", "types-flask>=1.1.0", + "pre-commit>=3.0.0", ] optionals = [ @@ -59,8 +63,13 @@ optionals = [ "numpy>=1.24.0", "Pillow>=9.0.0", "meshtastic>=2.0.0", + "meshcore>=1.0.0", "psycopg2-binary>=2.9.9", "scapy>=2.4.5", + "cryptography>=41.0.0", + "psutil>=5.9.0", + "gunicorn>=21.2.0", + "gevent>=23.9.0", ] [project.scripts] diff --git a/routes/bluetooth_v2.py b/routes/bluetooth_v2.py index f725ffe..4676f0c 100644 --- a/routes/bluetooth_v2.py +++ b/routes/bluetooth_v2.py @@ -186,6 +186,19 @@ def load_seen_device_ids() -> set[str]: return {row["device_id"] for row in cursor} +# ============================================================================= +# HELPERS +# ============================================================================= + + +def device_to_dict(device: BTDeviceAggregate) -> dict: + """Serialize a BTDeviceAggregate to a JSON-safe dict with heuristics flattened to top level.""" + d = device.to_dict() + heuristics = d.pop("heuristics", {}) + d.update(heuristics) + return d + + # ============================================================================= # API ENDPOINTS # ============================================================================= diff --git a/tests/conftest.py b/tests/conftest.py index 30ad304..3521c96 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ """Pytest configuration and fixtures.""" import contextlib +import os import sqlite3 from unittest.mock import MagicMock, patch @@ -10,14 +11,15 @@ from app import app as flask_app from routes import register_blueprints -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def app(): """Create application for testing.""" - flask_app.config['TESTING'] = True + os.environ["INTERCEPT_DISABLE_AUTH"] = "1" + flask_app.config["TESTING"] = True # Disable CSRF for tests - flask_app.config['WTF_CSRF_ENABLED'] = False + flask_app.config["WTF_CSRF_ENABLED"] = False # Register blueprints only if not already registered - if 'pager' not in flask_app.blueprints: + if "pager" not in flask_app.blueprints: register_blueprints(flask_app) return flask_app @@ -37,8 +39,7 @@ def mock_subprocess(): mock_subprocess['run'].return_value.stdout = 'output' mock_subprocess['run'].return_value.returncode = 0 """ - with patch('subprocess.Popen') as mock_popen, \ - patch('subprocess.run') as mock_run: + with patch("subprocess.Popen") as mock_popen, patch("subprocess.run") as mock_run: mock_process = MagicMock() mock_process.poll.return_value = None mock_process.stdout = MagicMock() @@ -46,14 +47,12 @@ def mock_subprocess(): mock_process.pid = 12345 mock_popen.return_value = mock_process - mock_run.return_value = MagicMock( - returncode=0, stdout='', stderr='' - ) + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") yield { - 'popen': mock_popen, - 'process': mock_process, - 'run': mock_run, + "popen": mock_popen, + "process": mock_process, + "run": mock_run, } @@ -65,14 +64,16 @@ def mock_sdr_device(): def test_example(mock_sdr_device): device = mock_sdr_device(device_type='rtlsdr', index=0) """ - def _factory(device_type='rtlsdr', index=0): + + def _factory(device_type="rtlsdr", index=0): device = MagicMock() device.device_type = device_type device.device_index = index - device.name = f'Mock {device_type} #{index}' + device.name = f"Mock {device_type} #{index}" device.is_available.return_value = True - device.build_command.return_value = ['rtl_fm', '-f', '100M'] + device.build_command.return_value = ["rtl_fm", "-f", "100M"] return device + return _factory @@ -92,9 +93,9 @@ def mock_app_state(): mock_lock = MagicMock() patches = { - 'current_process': mock_process, - 'pager_queue': mock_queue, - 'pager_lock': mock_lock, + "current_process": mock_process, + "pager_queue": mock_queue, + "pager_lock": mock_lock, } originals = {} for attr, value in patches.items(): @@ -102,10 +103,10 @@ def mock_app_state(): setattr(app_module, attr, value) yield { - 'process': mock_process, - 'queue': mock_queue, - 'lock': mock_lock, - 'module': app_module, + "process": mock_process, + "queue": mock_queue, + "lock": mock_lock, + "module": app_module, } for attr, orig in originals.items(): @@ -119,16 +120,16 @@ def mock_app_state(): @pytest.fixture def mock_check_tool(): """Patch check_tool() to return True for all tools.""" - with patch('utils.dependencies.check_tool', return_value=True) as mock: + with patch("utils.dependencies.check_tool", return_value=True) as mock: yield mock @pytest.fixture def test_db(tmp_path): """Provide an isolated in-memory SQLite database for tests.""" - db_path = tmp_path / 'test.db' + db_path = tmp_path / "test.db" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row - conn.execute('PRAGMA journal_mode = WAL') + conn.execute("PRAGMA journal_mode = WAL") yield conn conn.close() diff --git a/tests/test_satellite.py b/tests/test_satellite.py index d6b45f5..2983298 100644 --- a/tests/test_satellite.py +++ b/tests/test_satellite.py @@ -12,32 +12,36 @@ from routes.satellite import satellite_bp def app(): app = Flask(__name__) app.register_blueprint(satellite_bp) - app.config['TESTING'] = True + app.config["TESTING"] = True return app + @pytest.fixture def client(app): return app.test_client() + def test_predict_passes_invalid_coords(client): """Verify that invalid coordinates return a 400 error.""" payload = { "latitude": 150.0, # Invalid (>90) - "longitude": -0.1278 + "longitude": -0.1278, } - response = client.post('/satellite/predict', json=payload) + response = client.post("/satellite/predict", json=payload) assert response.status_code == 400 - assert response.json['status'] == 'error' + assert response.json["status"] == "error" + def test_fetch_celestrak_invalid_category(client): """Verify that an unauthorized category is rejected.""" - response = client.get('/satellite/celestrak/category_fake') + response = client.get("/satellite/celestrak/category_fake") assert response.status_code == 400 - assert response.json['status'] == 'error' - assert 'Invalid category' in response.json['message'] + assert response.json["status"] == "error" + assert "Invalid category" in response.json["message"] + # Mocking Tests (External Calls and Skyfield) -@patch('urllib.request.urlopen') +@patch("urllib.request.urlopen") def test_update_tle_success(mock_urlopen, client): """Simulate a successful response from CelesTrak.""" mock_content = ( @@ -51,26 +55,24 @@ def test_update_tle_success(mock_urlopen, client): mock_response.__enter__.return_value = mock_response mock_urlopen.return_value = mock_response - response = client.post('/satellite/update-tle') + response = client.post("/satellite/update-tle") assert response.status_code == 200 - assert response.json['status'] == 'success' - assert 'ISS' in response.json['updated'] + assert response.json["status"] == "success" + assert "ISS" in response.json["updated"] -@patch('skyfield.api.load') + +@patch("skyfield.api.load") def test_get_satellite_position_skyfield_error(mock_load, client): """Test behavior when Skyfield fails or data is missing.""" # Force the timescale load to fail mock_load.side_effect = Exception("Skyfield error") - payload = { - "latitude": 51.5, - "longitude": -0.1, - "satellites": ["ISS"] - } - response = client.post('/satellite/position', json=payload) + payload = {"latitude": 51.5, "longitude": -0.1, "satellites": ["ISS"]} + response = client.post("/satellite/position", json=payload) # Should return success but an empty positions list due to internal try-except assert response.status_code == 200 - assert response.json['positions'] == [] + assert response.json["positions"] == [] + def test_tracker_position_has_no_observer_fields(): """SSE tracker positions must NOT include observer-relative fields. @@ -83,9 +85,9 @@ def test_tracker_position_has_no_observer_fields(): from routes.satellite import _start_satellite_tracker ISS_TLE = ( - 'ISS (ZARYA)', - '1 25544U 98067A 24001.00000000 .00016717 00000-0 30171-3 0 9993', - '2 25544 51.6416 20.4567 0004561 45.3212 67.8912 15.49876543123457', + "ISS (ZARYA)", + "1 25544U 98067A 24001.00000000 .00016717 00000-0 30171-3 0 9993", + "2 25544 51.6416 20.4567 0004561 45.3212 67.8912 15.49876543123457", ) sat_q = queue.Queue(maxsize=5) @@ -93,54 +95,61 @@ def test_tracker_position_has_no_observer_fields(): mock_app.satellite_queue = sat_q from skyfield.api import load as _real_load + real_ts = _real_load.timescale(builtin=True) # Pre-populate track cache so the tracker loop doesn't block computing 90 points tle_key = (ISS_TLE[0], ISS_TLE[1][:20]) - stub_track = [{'lat': 0.0, 'lon': float(i), 'past': i < 45} for i in range(91)] - with patch('routes.satellite._tle_cache', {'ISS': ISS_TLE}), \ - patch('routes.satellite.get_tracked_satellites') as mock_tracked, \ - patch('routes.satellite._track_cache', {tle_key: (stub_track, 1e18)}), \ - patch('routes.satellite._get_timescale', return_value=real_ts), \ - patch.dict('sys.modules', {'app': mock_app}): - mock_tracked.return_value = [{ - 'name': 'ISS (ZARYA)', 'norad_id': 25544, - 'tle_line1': ISS_TLE[1], 'tle_line2': ISS_TLE[2], - }] + stub_track = [{"lat": 0.0, "lon": float(i), "past": i < 45} for i in range(91)] + with ( + patch("routes.satellite._tle_cache", {"ISS": ISS_TLE}), + patch("routes.satellite.get_tracked_satellites") as mock_tracked, + patch("routes.satellite._track_cache", {tle_key: (stub_track, 1e18)}), + patch("routes.satellite._get_timescale", return_value=real_ts), + patch.dict("sys.modules", {"app": mock_app}), + ): + mock_tracked.return_value = [ + { + "name": "ISS (ZARYA)", + "norad_id": 25544, + "tle_line1": ISS_TLE[1], + "tle_line2": ISS_TLE[2], + } + ] t = threading.Thread(target=_start_satellite_tracker, daemon=True) t.start() msg = sat_q.get(timeout=10) - assert msg['type'] == 'positions' - pos = msg['positions'][0] - for forbidden in ('elevation', 'azimuth', 'distance', 'visible'): + assert msg["type"] == "positions" + pos = msg["positions"][0] + for forbidden in ("elevation", "azimuth", "distance", "visible"): assert forbidden not in pos, f"SSE tracker must not emit '{forbidden}'" - for required in ('lat', 'lon', 'altitude', 'satellite', 'norad_id'): + for required in ("lat", "lon", "altitude", "satellite", "norad_id"): assert required in pos, f"SSE tracker must emit '{required}'" def test_predict_passes_currentpos_has_full_fields(client): """currentPos in pass results must include altitude, elevation, azimuth, distance.""" payload = { - 'latitude': 51.5074, - 'longitude': -0.1278, - 'hours': 48, - 'minEl': 5, - 'satellites': ['ISS'], + "latitude": 51.5074, + "longitude": -0.1278, + "hours": 2, + "minEl": 5, + "satellites": ["ISS"], } - response = client.post('/satellite/predict', json=payload) + response = client.post("/satellite/predict", json=payload) assert response.status_code == 200 data = response.json - assert data['status'] == 'success' - if data['passes']: - cp = data['passes'][0].get('currentPos', {}) - for field in ('lat', 'lon', 'altitude', 'elevation', 'azimuth', 'distance'): + assert data["status"] == "success" + if data["passes"]: + cp = data["passes"][0].get("currentPos", {}) + for field in ("lat", "lon", "altitude", "elevation", "azimuth", "distance"): assert field in cp, f"currentPos missing field: {field}" -@patch('routes.satellite.refresh_tle_data', return_value=['ISS']) -@patch('routes.satellite._load_db_satellites_into_cache') +@patch("routes.satellite.refresh_tle_data", return_value=["ISS"]) +@patch("routes.satellite._load_db_satellites_into_cache") def test_tle_auto_refresh_schedules_daily_repeat(mock_load_db, mock_refresh): """After the first TLE refresh, a 24-hour follow-up timer must be scheduled.""" import threading as real_threading @@ -158,28 +167,23 @@ def test_tle_auto_refresh_schedules_daily_repeat(mock_load_db, mock_refresh): if self._delay <= 5: self._fn() - with patch('routes.satellite.threading') as mock_threading: + with patch("routes.satellite.threading") as mock_threading: mock_threading.Timer = CapturingTimer mock_threading.Thread = real_threading.Thread from routes.satellite import init_tle_auto_refresh + init_tle_auto_refresh() # First timer: startup delay (≤5s); second timer: 24h repeat (≥86400s) - assert any(d <= 5 for d in scheduled_delays), \ - f"Expected startup delay timer; got delays: {scheduled_delays}" - assert any(d >= 86400 for d in scheduled_delays), \ - f"Expected ~24h repeat timer; got delays: {scheduled_delays}" + assert any(d <= 5 for d in scheduled_delays), f"Expected startup delay timer; got delays: {scheduled_delays}" + assert any(d >= 86400 for d in scheduled_delays), f"Expected ~24h repeat timer; got delays: {scheduled_delays}" # Logic Integration Test (Simulating prediction) def test_predict_passes_empty_cache(client): """Verify that if the satellite is not in cache, no passes are returned.""" - payload = { - "latitude": 51.5, - "longitude": -0.1, - "satellites": ["SATELLITE_NON_EXISTENT"] - } - response = client.post('/satellite/predict', json=payload) + payload = {"latitude": 51.5, "longitude": -0.1, "satellites": ["SATELLITE_NON_EXISTENT"]} + response = client.post("/satellite/predict", json=payload) assert response.status_code == 200 - assert len(response.json['passes']) == 0 + assert len(response.json["passes"]) == 0 diff --git a/utils/bluetooth/heuristics.py b/utils/bluetooth/heuristics.py index ff8e41b..3028653 100644 --- a/utils/bluetooth/heuristics.py +++ b/utils/bluetooth/heuristics.py @@ -30,12 +30,15 @@ class HeuristicsEngine: - has_random_address: Uses privacy-preserving random address """ - def evaluate(self, device: BTDeviceAggregate) -> None: + def evaluate(self, device: BTDeviceAggregate) -> BTDeviceAggregate: """ Evaluate all heuristics for a device and update its flags. Args: device: The BTDeviceAggregate to evaluate. + + Returns: + The same device instance with updated heuristic flags. """ # Note: is_new and has_random_address are set by the aggregator # Here we evaluate the behavioral heuristics @@ -43,6 +46,7 @@ class HeuristicsEngine: device.is_persistent = self._check_persistent(device) device.is_beacon_like = self._check_beacon_like(device) device.is_strong_stable = self._check_strong_stable(device) + return device def _check_persistent(self, device: BTDeviceAggregate) -> bool: """ @@ -134,45 +138,36 @@ class HeuristicsEngine: Returns: Dictionary with heuristic flags and explanations. """ - summary = { - 'flags': [], - 'details': {} - } + summary = {"flags": [], "details": {}} if device.is_new: - summary['flags'].append('new') - summary['details']['new'] = 'Device appeared after baseline was set' + summary["flags"].append("new") + summary["details"]["new"] = "Device appeared after baseline was set" if device.is_persistent: - summary['flags'].append('persistent') - summary['details']['persistent'] = ( - f'Seen {device.seen_count} times over ' - f'{device.duration_seconds:.0f}s ({device.seen_rate:.1f}/min)' + summary["flags"].append("persistent") + summary["details"]["persistent"] = ( + f"Seen {device.seen_count} times over {device.duration_seconds:.0f}s ({device.seen_rate:.1f}/min)" ) if device.is_beacon_like: - summary['flags'].append('beacon_like') + summary["flags"].append("beacon_like") intervals = self._calculate_intervals(device) if intervals: mean_int = statistics.mean(intervals) - summary['details']['beacon_like'] = ( - f'Regular advertising interval (~{mean_int:.1f}s)' - ) + summary["details"]["beacon_like"] = f"Regular advertising interval (~{mean_int:.1f}s)" else: - summary['details']['beacon_like'] = 'Regular advertising pattern' + summary["details"]["beacon_like"] = "Regular advertising pattern" if device.is_strong_stable: - summary['flags'].append('strong_stable') - summary['details']['strong_stable'] = ( - f'Strong signal ({device.rssi_median:.0f} dBm) ' - f'with low variance ({device.rssi_variance:.1f})' + summary["flags"].append("strong_stable") + summary["details"]["strong_stable"] = ( + f"Strong signal ({device.rssi_median:.0f} dBm) with low variance ({device.rssi_variance:.1f})" ) if device.has_random_address: - summary['flags'].append('random_address') - summary['details']['random_address'] = ( - f'Uses {device.address_type} address (privacy-preserving)' - ) + summary["flags"].append("random_address") + summary["details"]["random_address"] = f"Uses {device.address_type} address (privacy-preserving)" return summary