mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 22:59:59 -07:00
- Fix SSE fanout thread AttributeError when source queue is None during interpreter shutdown by snapshotting to local variable with null guard - Fix branded "i" logo rendering oversized on first page load (FOUC) by adding inline width/height to SVG elements across 10 templates - Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
316 lines
12 KiB
Python
316 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Smoke Test for Bluetooth API Backwards Compatibility
|
|
|
|
Run this script against a running INTERCEPT server to verify:
|
|
1. Existing v1/v2 endpoints still work
|
|
2. New tracker endpoints work
|
|
3. TSCM integration is not broken
|
|
4. JSON schemas are compatible
|
|
|
|
Usage:
|
|
python tests/smoke_test_bluetooth.py [--host HOST] [--port PORT]
|
|
|
|
Requirements:
|
|
- INTERCEPT server must be running
|
|
- requests library: pip install requests
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
print("Error: requests library required. Install with: pip install requests")
|
|
sys.exit(1)
|
|
|
|
|
|
# =============================================================================
|
|
# TEST CONFIGURATION
|
|
# =============================================================================
|
|
|
|
DEFAULT_HOST = "127.0.0.1"
|
|
DEFAULT_PORT = 5000
|
|
|
|
|
|
# =============================================================================
|
|
# SCHEMA VALIDATORS
|
|
# =============================================================================
|
|
|
|
def validate_device_schema(device: dict, context: str = "") -> list[str]:
|
|
"""Validate that a device dict has expected fields (backwards compatible)."""
|
|
errors = []
|
|
required_fields = [
|
|
'device_id', 'address', 'rssi_current', 'last_seen', 'seen_count'
|
|
]
|
|
|
|
for field in required_fields:
|
|
if field not in device:
|
|
errors.append(f"{context}Missing required field: {field}")
|
|
|
|
# New tracker fields should be present (v2) but are optional
|
|
tracker_fields = ['is_tracker', 'tracker_type', 'tracker_confidence']
|
|
for field in tracker_fields:
|
|
if field in device:
|
|
# Field exists, check type
|
|
if field == 'is_tracker' and not isinstance(device[field], bool):
|
|
errors.append(f"{context}is_tracker should be bool, got {type(device[field])}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_tracker_schema(tracker: dict, context: str = "") -> list[str]:
|
|
"""Validate tracker endpoint response schema."""
|
|
errors = []
|
|
|
|
required_fields = [
|
|
'device_id', 'address', 'tracker'
|
|
]
|
|
for field in required_fields:
|
|
if field not in tracker:
|
|
errors.append(f"{context}Missing required field: {field}")
|
|
|
|
# Tracker sub-object
|
|
if 'tracker' in tracker:
|
|
tracker_obj = tracker['tracker']
|
|
tracker_required = ['type', 'confidence', 'evidence']
|
|
for field in tracker_required:
|
|
if field not in tracker_obj:
|
|
errors.append(f"{context}tracker.{field} missing")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_diagnostics_schema(diagnostics: dict) -> list[str]:
|
|
"""Validate diagnostics endpoint response schema."""
|
|
errors = []
|
|
|
|
required_sections = ['system', 'bluez', 'adapters', 'permissions', 'backends']
|
|
for section in required_sections:
|
|
if section not in diagnostics:
|
|
errors.append(f"Missing diagnostics section: {section}")
|
|
|
|
if 'can_scan' not in diagnostics:
|
|
errors.append("Missing can_scan field")
|
|
|
|
return errors
|
|
|
|
|
|
# =============================================================================
|
|
# TEST CASES
|
|
# =============================================================================
|
|
|
|
class SmokeTests:
|
|
"""Smoke test runner."""
|
|
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url
|
|
self.passed = 0
|
|
self.failed = 0
|
|
self.errors = []
|
|
|
|
def _check(self, name: str, condition: bool, error_msg: str = ""):
|
|
"""Record a test result."""
|
|
if condition:
|
|
print(f" [PASS] {name}")
|
|
self.passed += 1
|
|
else:
|
|
print(f" [FAIL] {name}: {error_msg}")
|
|
self.failed += 1
|
|
self.errors.append(f"{name}: {error_msg}")
|
|
|
|
def test_capabilities_endpoint(self):
|
|
"""Test GET /api/bluetooth/capabilities"""
|
|
print("\n=== Test: Capabilities Endpoint ===")
|
|
try:
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/capabilities", timeout=5)
|
|
self._check("Status code 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
self._check("Has 'available' field", 'available' in data or 'can_scan' in data)
|
|
self._check("Has 'adapters' field", 'adapters' in data)
|
|
self._check("Has 'recommended_backend' field", 'recommended_backend' in data or 'preferred_backend' in data)
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_devices_endpoint(self):
|
|
"""Test GET /api/bluetooth/devices (backwards compatibility)"""
|
|
print("\n=== Test: Devices Endpoint (v2) ===")
|
|
try:
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/devices", timeout=5)
|
|
self._check("Status code 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
self._check("Has 'count' field", 'count' in data)
|
|
self._check("Has 'devices' array", 'devices' in data and isinstance(data['devices'], list))
|
|
|
|
# If devices exist, validate schema
|
|
if data.get('devices'):
|
|
device = data['devices'][0]
|
|
errors = validate_device_schema(device, "First device: ")
|
|
self._check("Device schema valid", len(errors) == 0, "; ".join(errors))
|
|
|
|
# Check for new tracker fields (should exist even if empty)
|
|
self._check("Has tracker fields", 'is_tracker' in device,
|
|
"New tracker field missing (backwards compat issue)")
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_trackers_endpoint(self):
|
|
"""Test GET /api/bluetooth/trackers (new v2 endpoint)"""
|
|
print("\n=== Test: Trackers Endpoint (NEW) ===")
|
|
try:
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/trackers", timeout=5)
|
|
self._check("Status code 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
self._check("Has 'count' field", 'count' in data)
|
|
self._check("Has 'trackers' array", 'trackers' in data and isinstance(data['trackers'], list))
|
|
self._check("Has 'summary' field", 'summary' in data)
|
|
|
|
# If trackers exist, validate schema
|
|
if data.get('trackers'):
|
|
tracker = data['trackers'][0]
|
|
errors = validate_tracker_schema(tracker, "First tracker: ")
|
|
self._check("Tracker schema valid", len(errors) == 0, "; ".join(errors))
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_diagnostics_endpoint(self):
|
|
"""Test GET /api/bluetooth/diagnostics (new endpoint)"""
|
|
print("\n=== Test: Diagnostics Endpoint (NEW) ===")
|
|
try:
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/diagnostics", timeout=5)
|
|
self._check("Status code 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
errors = validate_diagnostics_schema(data)
|
|
self._check("Diagnostics schema valid", len(errors) == 0, "; ".join(errors))
|
|
|
|
self._check("Has recommendations", 'recommendations' in data)
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_scan_status_endpoint(self):
|
|
"""Test GET /api/bluetooth/scan/status"""
|
|
print("\n=== Test: Scan Status Endpoint ===")
|
|
try:
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/scan/status", timeout=5)
|
|
self._check("Status code 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
self._check("Has 'is_scanning' field", 'is_scanning' in data)
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_baseline_endpoints(self):
|
|
"""Test baseline management endpoints"""
|
|
print("\n=== Test: Baseline Endpoints ===")
|
|
try:
|
|
# List baselines
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/baseline/list", timeout=5)
|
|
self._check("List baselines: Status 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
self._check("Has 'baselines' array", 'baselines' in data)
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_tscm_integration(self):
|
|
"""Test that TSCM still works with Bluetooth"""
|
|
print("\n=== Test: TSCM Integration ===")
|
|
try:
|
|
# Get TSCM sweep presets
|
|
resp = requests.get(f"{self.base_url}/tscm/devices", timeout=5)
|
|
# This might 404 if no devices, which is ok
|
|
self._check("TSCM devices endpoint accessible", resp.status_code in (200, 404))
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def test_export_endpoint(self):
|
|
"""Test GET /api/bluetooth/export"""
|
|
print("\n=== Test: Export Endpoint ===")
|
|
try:
|
|
# JSON export
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/export?format=json", timeout=5)
|
|
self._check("JSON export: Status 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
self._check("JSON export: Content-Type", 'application/json' in resp.headers.get('Content-Type', ''))
|
|
|
|
# CSV export
|
|
resp = requests.get(f"{self.base_url}/api/bluetooth/export?format=csv", timeout=5)
|
|
self._check("CSV export: Status 200", resp.status_code == 200, f"Got {resp.status_code}")
|
|
self._check("CSV export: Content-Type", 'text/csv' in resp.headers.get('Content-Type', ''))
|
|
|
|
except requests.RequestException as e:
|
|
self._check("Request succeeded", False, str(e))
|
|
|
|
def run_all(self):
|
|
"""Run all smoke tests."""
|
|
print(f"\n{'='*60}")
|
|
print("BLUETOOTH API SMOKE TESTS")
|
|
print(f"Target: {self.base_url}")
|
|
print(f"{'='*60}")
|
|
|
|
self.test_capabilities_endpoint()
|
|
self.test_devices_endpoint()
|
|
self.test_trackers_endpoint()
|
|
self.test_diagnostics_endpoint()
|
|
self.test_scan_status_endpoint()
|
|
self.test_baseline_endpoints()
|
|
self.test_export_endpoint()
|
|
self.test_tscm_integration()
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"RESULTS: {self.passed} passed, {self.failed} failed")
|
|
print(f"{'='*60}")
|
|
|
|
if self.errors:
|
|
print("\nFailed tests:")
|
|
for error in self.errors:
|
|
print(f" - {error}")
|
|
|
|
return self.failed == 0
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN
|
|
# =============================================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Bluetooth API smoke tests")
|
|
parser.add_argument("--host", default=DEFAULT_HOST, help="Server host")
|
|
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Server port")
|
|
args = parser.parse_args()
|
|
|
|
base_url = f"http://{args.host}:{args.port}"
|
|
|
|
# Check server is reachable
|
|
print(f"Checking server at {base_url}...")
|
|
try:
|
|
resp = requests.get(f"{base_url}/api/bluetooth/capabilities", timeout=5)
|
|
print(f"Server responded: {resp.status_code}")
|
|
except requests.RequestException as e:
|
|
print(f"ERROR: Cannot reach server at {base_url}")
|
|
print(f"Details: {e}")
|
|
print("\nMake sure INTERCEPT is running:")
|
|
print(" cd /path/to/intercept && python app.py")
|
|
sys.exit(1)
|
|
|
|
# Run tests
|
|
tests = SmokeTests(base_url)
|
|
success = tests.run_all()
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|