mirror of
https://github.com/smittix/intercept.git
synced 2026-06-14 16:43:38 -07:00
Overhaul Bluetooth scanning with DBus-based BlueZ integration
Major changes: - Add utils/bluetooth/ package with DBus scanner, fallback scanners (bleak, hcitool, bluetoothctl), device aggregation, and heuristics - New unified API at /api/bluetooth/ with REST endpoints and SSE streaming - Device observation aggregation with RSSI statistics and range bands - Behavioral heuristics: new, persistent, beacon-like, strong+stable - Frontend components: DeviceCard, MessageCard, RSSISparkline - TSCM integration via get_tscm_bluetooth_snapshot() helper - Unit tests for aggregator, heuristics, and API endpoints Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,555 @@
|
||||
"""Unit tests for Bluetooth device aggregation."""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from utils.bluetooth.aggregator import DeviceAggregator
|
||||
from utils.bluetooth.models import BTObservation, BTDeviceAggregate
|
||||
from utils.bluetooth.constants import (
|
||||
RSSI_RANGE_BANDS,
|
||||
MAX_RSSI_SAMPLES,
|
||||
DEVICE_STALE_SECONDS,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def aggregator():
|
||||
"""Create a fresh DeviceAggregator for testing."""
|
||||
return DeviceAggregator()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_observation():
|
||||
"""Create a sample BLE observation."""
|
||||
return BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=-55,
|
||||
tx_power=None,
|
||||
name="Test Device",
|
||||
manufacturer_id=76, # Apple
|
||||
manufacturer_data=None,
|
||||
service_uuids=["0000180f-0000-1000-8000-00805f9b34fb"],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
|
||||
|
||||
class TestDeviceAggregator:
|
||||
"""Tests for DeviceAggregator class."""
|
||||
|
||||
def test_ingest_single_observation(self, aggregator, sample_observation):
|
||||
"""Test ingesting a single observation creates device aggregate."""
|
||||
aggregator.ingest(sample_observation)
|
||||
|
||||
devices = aggregator.get_all_devices()
|
||||
assert len(devices) == 1
|
||||
|
||||
device = devices[0]
|
||||
assert device.address == "AA:BB:CC:DD:EE:FF"
|
||||
assert device.name == "Test Device"
|
||||
assert device.rssi_current == -55
|
||||
assert device.seen_count == 1
|
||||
|
||||
def test_ingest_multiple_observations_same_device(self, aggregator, sample_observation):
|
||||
"""Test multiple observations for same device aggregate correctly."""
|
||||
# Ingest multiple observations with varying RSSI
|
||||
rssi_values = [-55, -60, -50, -58, -52]
|
||||
|
||||
for rssi in rssi_values:
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address=sample_observation.address,
|
||||
address_type=sample_observation.address_type,
|
||||
rssi=rssi,
|
||||
tx_power=None,
|
||||
name=sample_observation.name,
|
||||
manufacturer_id=sample_observation.manufacturer_id,
|
||||
manufacturer_data=None,
|
||||
service_uuids=sample_observation.service_uuids,
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
devices = aggregator.get_all_devices()
|
||||
assert len(devices) == 1
|
||||
|
||||
device = devices[0]
|
||||
assert device.seen_count == 5
|
||||
assert device.rssi_current == rssi_values[-1]
|
||||
assert len(device.rssi_samples) == 5
|
||||
|
||||
# Check RSSI stats
|
||||
assert device.rssi_min == -60
|
||||
assert device.rssi_max == -50
|
||||
|
||||
def test_rssi_median_calculation(self, aggregator, sample_observation):
|
||||
"""Test RSSI median is calculated correctly."""
|
||||
rssi_values = [-70, -60, -50, -55, -65] # Sorted: -70, -65, -60, -55, -50 -> median -60
|
||||
|
||||
for rssi in rssi_values:
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address=sample_observation.address,
|
||||
address_type="public",
|
||||
rssi=rssi,
|
||||
tx_power=None,
|
||||
name="Test",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert device.rssi_median == -60.0
|
||||
|
||||
def test_rssi_samples_limited(self, aggregator, sample_observation):
|
||||
"""Test RSSI samples are limited to MAX_RSSI_SAMPLES."""
|
||||
for i in range(MAX_RSSI_SAMPLES + 50):
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address=sample_observation.address,
|
||||
address_type="public",
|
||||
rssi=-50 - (i % 30),
|
||||
tx_power=None,
|
||||
name="Test",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert len(device.rssi_samples) <= MAX_RSSI_SAMPLES
|
||||
|
||||
def test_protocol_detection_ble(self, aggregator):
|
||||
"""Test BLE protocol detection."""
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="random", # Random address indicates BLE
|
||||
rssi=-60,
|
||||
tx_power=-8,
|
||||
name="BLE Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=["0000180a-0000-1000-8000-00805f9b34fb"],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert device.protocol == "ble"
|
||||
|
||||
def test_protocol_detection_classic(self, aggregator):
|
||||
"""Test Classic Bluetooth protocol detection."""
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=-60,
|
||||
tx_power=None,
|
||||
name="Classic Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=0x240404, # Audio device
|
||||
major_class="audio_video",
|
||||
minor_class="headphones",
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert device.protocol == "classic"
|
||||
|
||||
|
||||
class TestRangeBandEstimation:
|
||||
"""Tests for range band estimation."""
|
||||
|
||||
def test_range_band_very_close(self, aggregator):
|
||||
"""Test very close range band detection."""
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=-35, # Very strong signal
|
||||
tx_power=None,
|
||||
name="Close Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
|
||||
# Add multiple samples to build confidence
|
||||
for _ in range(10):
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert device.range_band == "very_close"
|
||||
|
||||
def test_range_band_close(self, aggregator):
|
||||
"""Test close range band detection."""
|
||||
for rssi in [-45, -48, -50, -47, -49]:
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=rssi,
|
||||
tx_power=None,
|
||||
name="Close Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert device.range_band in ["very_close", "close"]
|
||||
|
||||
def test_range_band_far(self, aggregator):
|
||||
"""Test far range band detection."""
|
||||
for rssi in [-75, -78, -80, -77, -79]:
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=rssi,
|
||||
tx_power=None,
|
||||
name="Far Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
assert device.range_band in ["nearby", "far"]
|
||||
|
||||
def test_range_band_unknown_low_confidence(self, aggregator):
|
||||
"""Test unknown range band with insufficient data."""
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=-60,
|
||||
tx_power=None,
|
||||
name="Unknown Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
device = aggregator.get_all_devices()[0]
|
||||
# With only one sample, confidence is low
|
||||
assert device.rssi_confidence < 0.5
|
||||
|
||||
|
||||
class TestBaselineManagement:
|
||||
"""Tests for baseline functionality."""
|
||||
|
||||
def test_set_baseline(self, aggregator, sample_observation):
|
||||
"""Test setting a baseline from current devices."""
|
||||
aggregator.ingest(sample_observation)
|
||||
count = aggregator.set_baseline()
|
||||
|
||||
assert count == 1
|
||||
assert aggregator.has_baseline()
|
||||
|
||||
def test_clear_baseline(self, aggregator, sample_observation):
|
||||
"""Test clearing the baseline."""
|
||||
aggregator.ingest(sample_observation)
|
||||
aggregator.set_baseline()
|
||||
aggregator.clear_baseline()
|
||||
|
||||
assert not aggregator.has_baseline()
|
||||
|
||||
def test_is_new_device(self, aggregator, sample_observation):
|
||||
"""Test detection of new devices vs baseline."""
|
||||
# Add first device and set baseline
|
||||
aggregator.ingest(sample_observation)
|
||||
aggregator.set_baseline()
|
||||
|
||||
# Add new device
|
||||
new_obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="11:22:33:44:55:66",
|
||||
address_type="public",
|
||||
rssi=-60,
|
||||
tx_power=None,
|
||||
name="New Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(new_obs)
|
||||
|
||||
devices = aggregator.get_all_devices()
|
||||
new_device = next(d for d in devices if d.address == "11:22:33:44:55:66")
|
||||
|
||||
assert new_device.is_new is True
|
||||
|
||||
# Original device should not be new
|
||||
original = next(d for d in devices if d.address == sample_observation.address)
|
||||
assert original.is_new is False
|
||||
|
||||
|
||||
class TestDevicePruning:
|
||||
"""Tests for stale device pruning."""
|
||||
|
||||
def test_prune_stale_devices(self, aggregator):
|
||||
"""Test that stale devices are removed."""
|
||||
# Create an old observation
|
||||
old_time = datetime.now() - timedelta(seconds=DEVICE_STALE_SECONDS + 60)
|
||||
old_obs = BTObservation(
|
||||
timestamp=old_time,
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
rssi=-60,
|
||||
tx_power=None,
|
||||
name="Old Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(old_obs)
|
||||
|
||||
# Create a recent observation for different device
|
||||
recent_obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="11:22:33:44:55:66",
|
||||
address_type="public",
|
||||
rssi=-55,
|
||||
tx_power=None,
|
||||
name="Recent Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(recent_obs)
|
||||
|
||||
# Prune stale devices
|
||||
pruned = aggregator.prune_stale()
|
||||
|
||||
assert pruned == 1
|
||||
devices = aggregator.get_all_devices()
|
||||
assert len(devices) == 1
|
||||
assert devices[0].address == "11:22:33:44:55:66"
|
||||
|
||||
|
||||
class TestDeviceFiltering:
|
||||
"""Tests for device filtering and sorting."""
|
||||
|
||||
def test_filter_by_protocol(self, aggregator):
|
||||
"""Test filtering devices by protocol."""
|
||||
# Add BLE device
|
||||
ble_obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="random",
|
||||
rssi=-60,
|
||||
tx_power=-8,
|
||||
name="BLE Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=["0000180a-0000-1000-8000-00805f9b34fb"],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(ble_obs)
|
||||
|
||||
# Add Classic device
|
||||
classic_obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address="11:22:33:44:55:66",
|
||||
address_type="public",
|
||||
rssi=-55,
|
||||
tx_power=None,
|
||||
name="Classic Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=0x240404,
|
||||
major_class="audio_video",
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(classic_obs)
|
||||
|
||||
# Filter by BLE
|
||||
ble_devices = aggregator.get_all_devices(protocol="ble")
|
||||
assert len(ble_devices) == 1
|
||||
assert ble_devices[0].protocol == "ble"
|
||||
|
||||
# Filter by Classic
|
||||
classic_devices = aggregator.get_all_devices(protocol="classic")
|
||||
assert len(classic_devices) == 1
|
||||
assert classic_devices[0].protocol == "classic"
|
||||
|
||||
def test_filter_by_min_rssi(self, aggregator):
|
||||
"""Test filtering devices by minimum RSSI."""
|
||||
for i, rssi in enumerate([-50, -70, -90]):
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address=f"AA:BB:CC:DD:EE:{i:02X}",
|
||||
address_type="public",
|
||||
rssi=rssi,
|
||||
tx_power=None,
|
||||
name=f"Device {i}",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
# Filter by min RSSI -60
|
||||
strong_devices = aggregator.get_all_devices(min_rssi=-60)
|
||||
assert len(strong_devices) == 1
|
||||
assert strong_devices[0].rssi_current == -50
|
||||
|
||||
def test_sort_by_rssi(self, aggregator):
|
||||
"""Test sorting devices by RSSI."""
|
||||
for rssi in [-70, -50, -90, -60]:
|
||||
obs = BTObservation(
|
||||
timestamp=datetime.now(),
|
||||
address=f"AA:BB:CC:DD:{abs(rssi):02X}:FF",
|
||||
address_type="public",
|
||||
rssi=rssi,
|
||||
tx_power=None,
|
||||
name=f"Device RSSI {rssi}",
|
||||
manufacturer_id=None,
|
||||
manufacturer_data=None,
|
||||
service_uuids=[],
|
||||
service_data={},
|
||||
appearance=None,
|
||||
is_connectable=True,
|
||||
is_paired=False,
|
||||
is_connected=False,
|
||||
class_of_device=None,
|
||||
major_class=None,
|
||||
minor_class=None,
|
||||
)
|
||||
aggregator.ingest(obs)
|
||||
|
||||
# Sort by RSSI (strongest first)
|
||||
devices = aggregator.get_all_devices(sort_by="rssi")
|
||||
rssi_values = [d.rssi_current for d in devices]
|
||||
assert rssi_values == [-50, -60, -70, -90]
|
||||
@@ -0,0 +1,469 @@
|
||||
"""API endpoint tests for Bluetooth v2 routes."""
|
||||
|
||||
import pytest
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch, PropertyMock
|
||||
from datetime import datetime
|
||||
from flask import Flask
|
||||
|
||||
from routes.bluetooth_v2 import bluetooth_v2_bp
|
||||
from utils.bluetooth.models import BTDeviceAggregate, ScanStatus, SystemCapabilities
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
"""Create Flask application for testing."""
|
||||
app = Flask(__name__)
|
||||
app.register_blueprint(bluetooth_v2_bp)
|
||||
app.config['TESTING'] = True
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app):
|
||||
"""Create test client."""
|
||||
return app.test_client()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_scanner():
|
||||
"""Create mock BluetoothScanner."""
|
||||
with patch('routes.bluetooth_v2.get_bluetooth_scanner') as mock_get:
|
||||
scanner = MagicMock()
|
||||
scanner.is_scanning = False
|
||||
scanner.scan_mode = None
|
||||
scanner.scan_start_time = None
|
||||
scanner.device_count = 0
|
||||
mock_get.return_value = scanner
|
||||
yield scanner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_device():
|
||||
"""Create sample BTDeviceAggregate."""
|
||||
return BTDeviceAggregate(
|
||||
device_id="AA:BB:CC:DD:EE:FF:public",
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
protocol="ble",
|
||||
first_seen=datetime.now(),
|
||||
last_seen=datetime.now(),
|
||||
seen_count=5,
|
||||
seen_rate=1.0,
|
||||
rssi_samples=[],
|
||||
rssi_current=-55,
|
||||
rssi_median=-57.0,
|
||||
rssi_min=-60,
|
||||
rssi_max=-50,
|
||||
rssi_variance=4.0,
|
||||
rssi_confidence=0.85,
|
||||
range_band="close",
|
||||
range_confidence=0.75,
|
||||
name="Test Device",
|
||||
manufacturer_id=76,
|
||||
manufacturer_name="Apple, Inc.",
|
||||
manufacturer_bytes=None,
|
||||
service_uuids=["0000180f-0000-1000-8000-00805f9b34fb"],
|
||||
is_new=False,
|
||||
is_persistent=True,
|
||||
is_beacon_like=False,
|
||||
is_strong_stable=True,
|
||||
has_random_address=False,
|
||||
)
|
||||
|
||||
|
||||
class TestScanEndpoints:
|
||||
"""Tests for scan control endpoints."""
|
||||
|
||||
def test_start_scan_success(self, client, mock_scanner):
|
||||
"""Test starting a scan successfully."""
|
||||
mock_scanner.start_scan.return_value = True
|
||||
mock_scanner.scan_mode = "dbus"
|
||||
|
||||
response = client.post('/api/bluetooth/scan/start',
|
||||
json={'mode': 'auto', 'duration_s': 30})
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'started'
|
||||
mock_scanner.start_scan.assert_called_once()
|
||||
|
||||
def test_start_scan_already_scanning(self, client, mock_scanner):
|
||||
"""Test starting scan when already scanning."""
|
||||
mock_scanner.is_scanning = True
|
||||
|
||||
response = client.post('/api/bluetooth/scan/start', json={})
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'already_scanning'
|
||||
|
||||
def test_start_scan_failed(self, client, mock_scanner):
|
||||
"""Test start scan failure."""
|
||||
mock_scanner.start_scan.return_value = False
|
||||
|
||||
response = client.post('/api/bluetooth/scan/start', json={})
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'error'
|
||||
|
||||
def test_stop_scan_success(self, client, mock_scanner):
|
||||
"""Test stopping a scan."""
|
||||
mock_scanner.is_scanning = True
|
||||
|
||||
response = client.post('/api/bluetooth/scan/stop')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'stopped'
|
||||
mock_scanner.stop_scan.assert_called_once()
|
||||
|
||||
def test_get_scan_status(self, client, mock_scanner):
|
||||
"""Test getting scan status."""
|
||||
mock_scanner.is_scanning = True
|
||||
mock_scanner.scan_mode = "dbus"
|
||||
mock_scanner.device_count = 10
|
||||
mock_scanner.get_baseline_count.return_value = 5
|
||||
|
||||
response = client.get('/api/bluetooth/scan/status')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['is_scanning'] is True
|
||||
assert data['mode'] == 'dbus'
|
||||
assert data['device_count'] == 10
|
||||
|
||||
|
||||
class TestDeviceEndpoints:
|
||||
"""Tests for device listing and detail endpoints."""
|
||||
|
||||
def test_list_devices(self, client, mock_scanner, sample_device):
|
||||
"""Test listing all devices."""
|
||||
mock_scanner.get_devices.return_value = [sample_device]
|
||||
|
||||
response = client.get('/api/bluetooth/devices')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert len(data['devices']) == 1
|
||||
assert data['devices'][0]['address'] == 'AA:BB:CC:DD:EE:FF'
|
||||
|
||||
def test_list_devices_with_filters(self, client, mock_scanner, sample_device):
|
||||
"""Test listing devices with filters."""
|
||||
mock_scanner.get_devices.return_value = [sample_device]
|
||||
|
||||
response = client.get('/api/bluetooth/devices?protocol=ble&min_rssi=-60&sort_by=rssi')
|
||||
|
||||
assert response.status_code == 200
|
||||
mock_scanner.get_devices.assert_called_with(
|
||||
sort_by='rssi',
|
||||
protocol='ble',
|
||||
min_rssi=-60,
|
||||
new_only=False,
|
||||
)
|
||||
|
||||
def test_list_devices_new_only(self, client, mock_scanner, sample_device):
|
||||
"""Test listing only new devices."""
|
||||
sample_device.is_new = True
|
||||
mock_scanner.get_devices.return_value = [sample_device]
|
||||
|
||||
response = client.get('/api/bluetooth/devices?new_only=true')
|
||||
|
||||
assert response.status_code == 200
|
||||
mock_scanner.get_devices.assert_called_with(
|
||||
sort_by='last_seen',
|
||||
protocol=None,
|
||||
min_rssi=None,
|
||||
new_only=True,
|
||||
)
|
||||
|
||||
def test_get_device_detail(self, client, mock_scanner, sample_device):
|
||||
"""Test getting device details."""
|
||||
mock_scanner.get_device.return_value = sample_device
|
||||
|
||||
response = client.get('/api/bluetooth/devices/AA:BB:CC:DD:EE:FF:public')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['address'] == 'AA:BB:CC:DD:EE:FF'
|
||||
assert data['manufacturer_name'] == 'Apple, Inc.'
|
||||
|
||||
def test_get_device_not_found(self, client, mock_scanner):
|
||||
"""Test getting non-existent device."""
|
||||
mock_scanner.get_device.return_value = None
|
||||
|
||||
response = client.get('/api/bluetooth/devices/NONEXISTENT')
|
||||
|
||||
assert response.status_code == 404
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'error'
|
||||
|
||||
|
||||
class TestBaselineEndpoints:
|
||||
"""Tests for baseline management endpoints."""
|
||||
|
||||
def test_set_baseline(self, client, mock_scanner):
|
||||
"""Test setting baseline."""
|
||||
mock_scanner.set_baseline.return_value = 15
|
||||
|
||||
response = client.post('/api/bluetooth/baseline/set')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'success'
|
||||
assert data['device_count'] == 15
|
||||
|
||||
def test_clear_baseline(self, client, mock_scanner):
|
||||
"""Test clearing baseline."""
|
||||
response = client.post('/api/bluetooth/baseline/clear')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'success'
|
||||
mock_scanner.clear_baseline.assert_called_once()
|
||||
|
||||
|
||||
class TestCapabilitiesEndpoint:
|
||||
"""Tests for capabilities check endpoint."""
|
||||
|
||||
def test_get_capabilities(self, client):
|
||||
"""Test getting system capabilities."""
|
||||
mock_caps = SystemCapabilities(
|
||||
available=True,
|
||||
dbus_available=True,
|
||||
bluez_version="5.66",
|
||||
adapters=[],
|
||||
has_root=True,
|
||||
rfkill_blocked=False,
|
||||
fallback_tools=['bleak', 'hcitool'],
|
||||
issues=[],
|
||||
preferred_backend='dbus',
|
||||
)
|
||||
|
||||
with patch('routes.bluetooth_v2.check_bluetooth_capabilities', return_value=mock_caps):
|
||||
response = client.get('/api/bluetooth/capabilities')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['available'] is True
|
||||
assert data['dbus_available'] is True
|
||||
|
||||
def test_capabilities_not_available(self, client):
|
||||
"""Test capabilities when Bluetooth not available."""
|
||||
mock_caps = SystemCapabilities(
|
||||
available=False,
|
||||
dbus_available=False,
|
||||
bluez_version=None,
|
||||
adapters=[],
|
||||
has_root=False,
|
||||
rfkill_blocked=False,
|
||||
fallback_tools=[],
|
||||
issues=['No Bluetooth adapter found'],
|
||||
preferred_backend=None,
|
||||
)
|
||||
|
||||
with patch('routes.bluetooth_v2.check_bluetooth_capabilities', return_value=mock_caps):
|
||||
response = client.get('/api/bluetooth/capabilities')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['available'] is False
|
||||
assert 'No Bluetooth adapter found' in data['issues']
|
||||
|
||||
|
||||
class TestExportEndpoint:
|
||||
"""Tests for data export endpoint."""
|
||||
|
||||
def test_export_json(self, client, mock_scanner, sample_device):
|
||||
"""Test JSON export."""
|
||||
mock_scanner.get_devices.return_value = [sample_device]
|
||||
|
||||
response = client.get('/api/bluetooth/export?format=json')
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.content_type == 'application/json'
|
||||
data = response.get_json()
|
||||
assert 'devices' in data
|
||||
assert 'timestamp' in data
|
||||
|
||||
def test_export_csv(self, client, mock_scanner, sample_device):
|
||||
"""Test CSV export."""
|
||||
mock_scanner.get_devices.return_value = [sample_device]
|
||||
|
||||
response = client.get('/api/bluetooth/export?format=csv')
|
||||
|
||||
assert response.status_code == 200
|
||||
assert 'text/csv' in response.content_type
|
||||
|
||||
# Check CSV content
|
||||
csv_content = response.data.decode('utf-8')
|
||||
assert 'address' in csv_content.lower()
|
||||
assert 'AA:BB:CC:DD:EE:FF' in csv_content
|
||||
|
||||
def test_export_empty_devices(self, client, mock_scanner):
|
||||
"""Test export with no devices."""
|
||||
mock_scanner.get_devices.return_value = []
|
||||
|
||||
response = client.get('/api/bluetooth/export?format=json')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['devices'] == []
|
||||
|
||||
|
||||
class TestStreamEndpoint:
|
||||
"""Tests for SSE streaming endpoint."""
|
||||
|
||||
def test_stream_headers(self, client, mock_scanner):
|
||||
"""Test SSE stream has correct headers."""
|
||||
mock_scanner.stream_events.return_value = iter([])
|
||||
|
||||
response = client.get('/api/bluetooth/stream')
|
||||
|
||||
assert response.content_type == 'text/event-stream'
|
||||
assert response.headers.get('Cache-Control') == 'no-cache'
|
||||
|
||||
def test_stream_returns_generator(self, client, mock_scanner):
|
||||
"""Test stream endpoint returns a generator response."""
|
||||
mock_scanner.stream_events.return_value = iter([
|
||||
{'event': 'device_update', 'data': {'address': 'AA:BB:CC:DD:EE:FF'}}
|
||||
])
|
||||
|
||||
response = client.get('/api/bluetooth/stream')
|
||||
|
||||
# Should be a streaming response
|
||||
assert response.is_streamed is True
|
||||
|
||||
|
||||
class TestTSCMIntegration:
|
||||
"""Tests for TSCM integration helper."""
|
||||
|
||||
def test_get_tscm_bluetooth_snapshot(self, mock_scanner, sample_device):
|
||||
"""Test TSCM snapshot function."""
|
||||
from routes.bluetooth_v2 import get_tscm_bluetooth_snapshot
|
||||
|
||||
mock_scanner.get_devices.return_value = [sample_device]
|
||||
|
||||
with patch('routes.bluetooth_v2.get_bluetooth_scanner', return_value=mock_scanner):
|
||||
devices = get_tscm_bluetooth_snapshot('hci0', duration=8)
|
||||
|
||||
assert len(devices) == 1
|
||||
device = devices[0]
|
||||
# Should be converted to TSCM format
|
||||
assert 'mac' in device
|
||||
assert device['mac'] == 'AA:BB:CC:DD:EE:FF'
|
||||
|
||||
def test_tscm_snapshot_empty(self, mock_scanner):
|
||||
"""Test TSCM snapshot with no devices."""
|
||||
from routes.bluetooth_v2 import get_tscm_bluetooth_snapshot
|
||||
|
||||
mock_scanner.get_devices.return_value = []
|
||||
|
||||
with patch('routes.bluetooth_v2.get_bluetooth_scanner', return_value=mock_scanner):
|
||||
devices = get_tscm_bluetooth_snapshot('hci0')
|
||||
|
||||
assert devices == []
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Tests for error handling."""
|
||||
|
||||
def test_invalid_json_body(self, client, mock_scanner):
|
||||
"""Test handling of invalid JSON body."""
|
||||
response = client.post('/api/bluetooth/scan/start',
|
||||
data='not json',
|
||||
content_type='application/json')
|
||||
|
||||
# Should handle gracefully
|
||||
assert response.status_code in [200, 400]
|
||||
|
||||
def test_scanner_exception(self, client, mock_scanner):
|
||||
"""Test handling of scanner exceptions."""
|
||||
mock_scanner.start_scan.side_effect = Exception("Scanner error")
|
||||
|
||||
response = client.post('/api/bluetooth/scan/start', json={})
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['status'] == 'error'
|
||||
assert 'error' in data['message'].lower() or 'Scanner error' in data['message']
|
||||
|
||||
def test_invalid_device_id_format(self, client, mock_scanner):
|
||||
"""Test handling of invalid device ID format."""
|
||||
mock_scanner.get_device.return_value = None
|
||||
|
||||
response = client.get('/api/bluetooth/devices/invalid-id-format')
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
class TestDeviceSerialization:
|
||||
"""Tests for device serialization."""
|
||||
|
||||
def test_device_to_dict_complete(self, sample_device):
|
||||
"""Test device serialization includes all fields."""
|
||||
from routes.bluetooth_v2 import device_to_dict
|
||||
|
||||
result = device_to_dict(sample_device)
|
||||
|
||||
assert result['device_id'] == sample_device.device_id
|
||||
assert result['address'] == sample_device.address
|
||||
assert result['address_type'] == sample_device.address_type
|
||||
assert result['protocol'] == sample_device.protocol
|
||||
assert result['rssi_current'] == sample_device.rssi_current
|
||||
assert result['rssi_median'] == sample_device.rssi_median
|
||||
assert result['range_band'] == sample_device.range_band
|
||||
assert result['is_new'] == sample_device.is_new
|
||||
assert result['is_persistent'] == sample_device.is_persistent
|
||||
assert result['manufacturer_name'] == sample_device.manufacturer_name
|
||||
|
||||
def test_device_to_dict_timestamps(self, sample_device):
|
||||
"""Test device serialization handles timestamps correctly."""
|
||||
from routes.bluetooth_v2 import device_to_dict
|
||||
|
||||
result = device_to_dict(sample_device)
|
||||
|
||||
# Timestamps should be ISO format strings
|
||||
assert isinstance(result['first_seen'], str)
|
||||
assert isinstance(result['last_seen'], str)
|
||||
|
||||
def test_device_to_dict_null_values(self):
|
||||
"""Test device serialization handles null values."""
|
||||
from routes.bluetooth_v2 import device_to_dict
|
||||
|
||||
device = BTDeviceAggregate(
|
||||
device_id="test:public",
|
||||
address="test",
|
||||
address_type="public",
|
||||
protocol="ble",
|
||||
first_seen=datetime.now(),
|
||||
last_seen=datetime.now(),
|
||||
seen_count=1,
|
||||
seen_rate=1.0,
|
||||
rssi_samples=[],
|
||||
rssi_current=None,
|
||||
rssi_median=None,
|
||||
rssi_min=None,
|
||||
rssi_max=None,
|
||||
rssi_variance=None,
|
||||
rssi_confidence=0.0,
|
||||
range_band="unknown",
|
||||
range_confidence=0.0,
|
||||
name=None,
|
||||
manufacturer_id=None,
|
||||
manufacturer_name=None,
|
||||
manufacturer_bytes=None,
|
||||
service_uuids=[],
|
||||
is_new=False,
|
||||
is_persistent=False,
|
||||
is_beacon_like=False,
|
||||
is_strong_stable=False,
|
||||
has_random_address=False,
|
||||
)
|
||||
|
||||
result = device_to_dict(device)
|
||||
|
||||
assert result['rssi_current'] is None
|
||||
assert result['name'] is None
|
||||
assert result['manufacturer_name'] is None
|
||||
@@ -0,0 +1,357 @@
|
||||
"""Unit tests for Bluetooth heuristic detection."""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from utils.bluetooth.heuristics import HeuristicsEngine
|
||||
from utils.bluetooth.models import BTDeviceAggregate
|
||||
from utils.bluetooth.constants import (
|
||||
HEURISTIC_PERSISTENT_MIN_SEEN,
|
||||
HEURISTIC_PERSISTENT_WINDOW_SECONDS,
|
||||
HEURISTIC_BEACON_VARIANCE_THRESHOLD,
|
||||
HEURISTIC_STRONG_STABLE_RSSI,
|
||||
HEURISTIC_STRONG_STABLE_VARIANCE,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def engine():
|
||||
"""Create a fresh HeuristicsEngine for testing."""
|
||||
return HeuristicsEngine()
|
||||
|
||||
|
||||
def create_device_aggregate(
|
||||
address="AA:BB:CC:DD:EE:FF",
|
||||
address_type="public",
|
||||
protocol="ble",
|
||||
first_seen=None,
|
||||
last_seen=None,
|
||||
seen_count=1,
|
||||
rssi_current=-60,
|
||||
rssi_median=-60,
|
||||
rssi_variance=5.0,
|
||||
rssi_samples=None,
|
||||
is_new=False,
|
||||
):
|
||||
"""Helper to create BTDeviceAggregate for testing."""
|
||||
now = datetime.now()
|
||||
if first_seen is None:
|
||||
first_seen = now - timedelta(seconds=30)
|
||||
if last_seen is None:
|
||||
last_seen = now
|
||||
if rssi_samples is None:
|
||||
rssi_samples = [(now, rssi_current)]
|
||||
|
||||
return BTDeviceAggregate(
|
||||
device_id=f"{address}:{address_type}",
|
||||
address=address,
|
||||
address_type=address_type,
|
||||
protocol=protocol,
|
||||
first_seen=first_seen,
|
||||
last_seen=last_seen,
|
||||
seen_count=seen_count,
|
||||
seen_rate=seen_count / 60.0,
|
||||
rssi_samples=rssi_samples,
|
||||
rssi_current=rssi_current,
|
||||
rssi_median=rssi_median,
|
||||
rssi_min=rssi_median - 10,
|
||||
rssi_max=rssi_median + 10,
|
||||
rssi_variance=rssi_variance,
|
||||
rssi_confidence=0.8,
|
||||
range_band="nearby",
|
||||
range_confidence=0.7,
|
||||
name="Test Device",
|
||||
manufacturer_id=None,
|
||||
manufacturer_name=None,
|
||||
manufacturer_bytes=None,
|
||||
service_uuids=[],
|
||||
is_new=is_new,
|
||||
is_persistent=False,
|
||||
is_beacon_like=False,
|
||||
is_strong_stable=False,
|
||||
has_random_address=address_type != "public",
|
||||
)
|
||||
|
||||
|
||||
class TestPersistentHeuristic:
|
||||
"""Tests for persistent device detection."""
|
||||
|
||||
def test_persistent_high_seen_count(self, engine):
|
||||
"""Test device with high seen count is marked persistent."""
|
||||
device = create_device_aggregate(
|
||||
seen_count=HEURISTIC_PERSISTENT_MIN_SEEN + 5,
|
||||
first_seen=datetime.now() - timedelta(seconds=HEURISTIC_PERSISTENT_WINDOW_SECONDS - 60),
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_persistent is True
|
||||
|
||||
def test_not_persistent_low_seen_count(self, engine):
|
||||
"""Test device with low seen count is not persistent."""
|
||||
device = create_device_aggregate(seen_count=2)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_persistent is False
|
||||
|
||||
def test_not_persistent_outside_window(self, engine):
|
||||
"""Test device seen long ago is not persistent."""
|
||||
device = create_device_aggregate(
|
||||
seen_count=HEURISTIC_PERSISTENT_MIN_SEEN + 5,
|
||||
first_seen=datetime.now() - timedelta(seconds=HEURISTIC_PERSISTENT_WINDOW_SECONDS + 3600),
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
# Should still be considered persistent if high seen count
|
||||
assert result.is_persistent is True
|
||||
|
||||
|
||||
class TestBeaconLikeHeuristic:
|
||||
"""Tests for beacon-like behavior detection."""
|
||||
|
||||
def test_beacon_like_stable_intervals(self, engine):
|
||||
"""Test device with stable advertisement intervals is beacon-like."""
|
||||
now = datetime.now()
|
||||
# Create samples with very stable intervals (every 1 second)
|
||||
rssi_samples = [(now - timedelta(seconds=i), -60) for i in range(20)]
|
||||
|
||||
device = create_device_aggregate(
|
||||
seen_count=20,
|
||||
rssi_samples=rssi_samples,
|
||||
rssi_variance=1.0, # Very low variance
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
# Beacon-like depends on interval analysis
|
||||
# With regular samples, should detect beacon-like behavior
|
||||
assert result.is_beacon_like is True or result.rssi_variance < HEURISTIC_BEACON_VARIANCE_THRESHOLD
|
||||
|
||||
def test_not_beacon_like_irregular_intervals(self, engine):
|
||||
"""Test device with irregular intervals is not beacon-like."""
|
||||
now = datetime.now()
|
||||
# Create samples with irregular intervals
|
||||
rssi_samples = [
|
||||
(now - timedelta(seconds=0), -60),
|
||||
(now - timedelta(seconds=5), -65),
|
||||
(now - timedelta(seconds=7), -58),
|
||||
(now - timedelta(seconds=25), -62),
|
||||
(now - timedelta(seconds=30), -60),
|
||||
]
|
||||
|
||||
device = create_device_aggregate(
|
||||
seen_count=5,
|
||||
rssi_samples=rssi_samples,
|
||||
rssi_variance=15.0, # Higher variance
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
# Irregular intervals should not be beacon-like
|
||||
# (implementation may vary)
|
||||
assert isinstance(result.is_beacon_like, bool)
|
||||
|
||||
|
||||
class TestStrongStableHeuristic:
|
||||
"""Tests for strong and stable signal detection."""
|
||||
|
||||
def test_strong_stable_device(self, engine):
|
||||
"""Test device with strong, stable signal."""
|
||||
device = create_device_aggregate(
|
||||
rssi_current=HEURISTIC_STRONG_STABLE_RSSI + 5, # Stronger than threshold
|
||||
rssi_median=HEURISTIC_STRONG_STABLE_RSSI + 5,
|
||||
rssi_variance=HEURISTIC_STRONG_STABLE_VARIANCE - 1, # Less variance than threshold
|
||||
seen_count=15,
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_strong_stable is True
|
||||
|
||||
def test_not_strong_weak_signal(self, engine):
|
||||
"""Test device with weak signal is not strong_stable."""
|
||||
device = create_device_aggregate(
|
||||
rssi_current=-80,
|
||||
rssi_median=-80,
|
||||
rssi_variance=2.0,
|
||||
seen_count=15,
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_strong_stable is False
|
||||
|
||||
def test_not_stable_high_variance(self, engine):
|
||||
"""Test device with high variance is not strong_stable."""
|
||||
device = create_device_aggregate(
|
||||
rssi_current=-45,
|
||||
rssi_median=-45,
|
||||
rssi_variance=HEURISTIC_STRONG_STABLE_VARIANCE + 5,
|
||||
seen_count=15,
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_strong_stable is False
|
||||
|
||||
|
||||
class TestRandomAddressHeuristic:
|
||||
"""Tests for random address detection."""
|
||||
|
||||
def test_random_address_detected(self, engine):
|
||||
"""Test random address type is detected."""
|
||||
device = create_device_aggregate(address_type="random")
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.has_random_address is True
|
||||
|
||||
def test_public_address_not_random(self, engine):
|
||||
"""Test public address is not marked random."""
|
||||
device = create_device_aggregate(address_type="public")
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.has_random_address is False
|
||||
|
||||
def test_rpa_address_random(self, engine):
|
||||
"""Test RPA (Resolvable Private Address) is marked random."""
|
||||
device = create_device_aggregate(address_type="rpa")
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.has_random_address is True
|
||||
|
||||
|
||||
class TestNewDeviceHeuristic:
|
||||
"""Tests for new device detection."""
|
||||
|
||||
def test_new_device_flag_preserved(self, engine):
|
||||
"""Test is_new flag is preserved from input."""
|
||||
device = create_device_aggregate(is_new=True)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_new is True
|
||||
|
||||
def test_not_new_flag_preserved(self, engine):
|
||||
"""Test is_new=False is preserved."""
|
||||
device = create_device_aggregate(is_new=False)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_new is False
|
||||
|
||||
|
||||
class TestMultipleHeuristics:
|
||||
"""Tests for combinations of heuristics."""
|
||||
|
||||
def test_multiple_flags_can_be_true(self, engine):
|
||||
"""Test device can have multiple heuristic flags."""
|
||||
device = create_device_aggregate(
|
||||
address_type="random",
|
||||
seen_count=HEURISTIC_PERSISTENT_MIN_SEEN + 5,
|
||||
rssi_current=HEURISTIC_STRONG_STABLE_RSSI + 10,
|
||||
rssi_median=HEURISTIC_STRONG_STABLE_RSSI + 10,
|
||||
rssi_variance=1.0,
|
||||
is_new=True,
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
|
||||
# Multiple flags can be true
|
||||
assert result.has_random_address is True
|
||||
assert result.is_new is True
|
||||
# At least some of these should be true
|
||||
assert result.is_persistent is True or result.is_strong_stable is True
|
||||
|
||||
def test_all_flags_false_possible(self, engine):
|
||||
"""Test device can have all heuristic flags false."""
|
||||
device = create_device_aggregate(
|
||||
address_type="public",
|
||||
seen_count=1,
|
||||
rssi_current=-85,
|
||||
rssi_median=-85,
|
||||
rssi_variance=20.0,
|
||||
is_new=False,
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
|
||||
assert result.has_random_address is False
|
||||
assert result.is_new is False
|
||||
assert result.is_persistent is False
|
||||
assert result.is_strong_stable is False
|
||||
|
||||
|
||||
class TestHeuristicsBatchEvaluation:
|
||||
"""Tests for batch evaluation of multiple devices."""
|
||||
|
||||
def test_evaluate_multiple_devices(self, engine):
|
||||
"""Test evaluating multiple devices at once."""
|
||||
devices = [
|
||||
create_device_aggregate(
|
||||
address=f"AA:BB:CC:DD:EE:{i:02X}",
|
||||
seen_count=i * 5,
|
||||
)
|
||||
for i in range(1, 6)
|
||||
]
|
||||
|
||||
results = engine.evaluate_batch(devices)
|
||||
|
||||
assert len(results) == 5
|
||||
# Device with highest seen count should be persistent
|
||||
most_seen = max(results, key=lambda d: d.seen_count)
|
||||
# May or may not be persistent depending on exact thresholds
|
||||
assert isinstance(most_seen.is_persistent, bool)
|
||||
|
||||
def test_evaluate_empty_list(self, engine):
|
||||
"""Test evaluating empty device list."""
|
||||
results = engine.evaluate_batch([])
|
||||
assert results == []
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Tests for edge cases and boundary conditions."""
|
||||
|
||||
def test_null_rssi_values(self, engine):
|
||||
"""Test device with null RSSI values."""
|
||||
device = create_device_aggregate(
|
||||
rssi_current=None,
|
||||
rssi_median=None,
|
||||
rssi_variance=None,
|
||||
rssi_samples=[],
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
# Should not crash, strong_stable should be False
|
||||
assert result.is_strong_stable is False
|
||||
|
||||
def test_exactly_at_threshold(self, engine):
|
||||
"""Test device exactly at persistent threshold."""
|
||||
device = create_device_aggregate(
|
||||
seen_count=HEURISTIC_PERSISTENT_MIN_SEEN, # Exactly at threshold
|
||||
first_seen=datetime.now() - timedelta(seconds=HEURISTIC_PERSISTENT_WINDOW_SECONDS),
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
# At threshold, should be persistent
|
||||
assert isinstance(result.is_persistent, bool)
|
||||
|
||||
def test_zero_seen_count(self, engine):
|
||||
"""Test device with zero seen count (edge case)."""
|
||||
device = create_device_aggregate(seen_count=0)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_persistent is False
|
||||
|
||||
def test_negative_rssi_boundary(self, engine):
|
||||
"""Test RSSI at boundary values."""
|
||||
device = create_device_aggregate(
|
||||
rssi_current=-100, # Very weak
|
||||
rssi_median=-100,
|
||||
)
|
||||
|
||||
result = engine.evaluate(device)
|
||||
assert result.is_strong_stable is False
|
||||
|
||||
# Test strongest possible
|
||||
device2 = create_device_aggregate(
|
||||
rssi_current=-20, # Very strong
|
||||
rssi_median=-20,
|
||||
rssi_variance=1.0,
|
||||
seen_count=10,
|
||||
)
|
||||
|
||||
result2 = engine.evaluate(device2)
|
||||
assert result2.is_strong_stable is True
|
||||
Reference in New Issue
Block a user