diff --git a/tests/test_drone_remote_id.py b/tests/test_drone_remote_id.py new file mode 100644 index 0000000..05bc73b --- /dev/null +++ b/tests/test_drone_remote_id.py @@ -0,0 +1,92 @@ +# tests/test_drone_remote_id.py +import queue +import struct +from unittest.mock import MagicMock, patch + +from utils.drone.remote_id import RemoteIDScanner, _parse_ble_remote_id, _parse_wifi_remote_id + + +def _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0) -> bytes: + """Craft a minimal ASTM F3411 Location message (message type 0x01).""" + msg_type = 0x01 + status = 0x00 + lat_enc = int(lat * 1e7) + lon_enc = int(lon * 1e7) + alt_enc = int((alt + 1000) / 0.5) + speed_enc = int(speed / 0.25) + heading_enc = int(heading / 0.01) + return struct.pack(" bytes: + msg_type = 0x00 + id_type = 0x01 + serial_bytes = serial.encode("ascii").ljust(20, b"\x00")[:20] + return bytes([msg_type, id_type]) + serial_bytes + + +def _make_ble_adv_with_remote_id(payload: bytes) -> bytes: + uuid_bytes = b"\xfa\xff" + service_data_type = 0x16 + length = len(uuid_bytes) + len(payload) + 1 + return bytes([length, service_data_type]) + uuid_bytes + payload + + +def test_parse_ble_location_returns_observation(): + payload = _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0) + adv = _make_ble_adv_with_remote_id(payload) + obs = _parse_ble_remote_id(adv) + assert obs is not None + assert obs.source == "BLE" + assert abs(obs.lat - 51.5) < 0.0001 + assert abs(obs.lon - (-0.1)) < 0.0001 + assert abs(obs.altitude_m - 50.0) < 1.0 + assert abs(obs.speed_ms - 5.0) < 0.5 + + +def test_parse_ble_no_uuid_returns_none(): + obs = _parse_ble_remote_id(b"\x00\x01\x02\x03") + assert obs is None + + +def test_parse_ble_too_short_returns_none(): + adv = _make_ble_adv_with_remote_id(b"\x01\x00") + obs = _parse_ble_remote_id(adv) + assert obs is None + + +def test_parse_wifi_remote_id_returns_observation(): + payload = _make_location_payload(lat=52.0, lon=0.5) + obs = _parse_wifi_remote_id(payload) + assert obs is not None + assert obs.source == "WIFI" + assert abs(obs.lat - 52.0) < 0.0001 + + +def test_parse_wifi_non_location_returns_none(): + payload = _make_basic_id_payload() + obs = _parse_wifi_remote_id(payload) + assert obs is None + + +def test_scanner_start_stop(): + q = queue.Queue() + scanner = RemoteIDScanner(output_queue=q) + with ( + patch("utils.drone.remote_id.SCAPY_AVAILABLE", True), + patch("utils.drone.remote_id.AsyncSniffer") as mock_sniffer, + ): + mock_sniffer.return_value = MagicMock() + scanner.start(wifi_iface="wlan0mon") + assert scanner.running + scanner.stop() + assert not scanner.running + + +def test_scanner_start_without_scapy_still_works(): + q = queue.Queue() + scanner = RemoteIDScanner(output_queue=q) + with patch("utils.drone.remote_id.SCAPY_AVAILABLE", False): + scanner.start(wifi_iface=None) + assert scanner.running + scanner.stop() diff --git a/utils/drone/remote_id.py b/utils/drone/remote_id.py new file mode 100644 index 0000000..8238866 --- /dev/null +++ b/utils/drone/remote_id.py @@ -0,0 +1,122 @@ +# utils/drone/remote_id.py +"""Remote ID scanner — WiFi beacon + BLE advertisement parsing (ASTM F3411).""" + +from __future__ import annotations + +import contextlib +import logging +import queue +import struct +from datetime import datetime, timezone + +from .models import RemoteIDObservation + +logger = logging.getLogger("intercept.drone.remote_id") + +_REMOTE_ID_UUID_LE = b"\xfa\xff" +_LOCATION_MSG_TYPE = 0x01 +_MIN_LOCATION_PAYLOAD = 15 + +try: + from scapy.all import AsyncSniffer, Dot11Beacon, Dot11Elt + + SCAPY_AVAILABLE = True +except ImportError: + SCAPY_AVAILABLE = False + AsyncSniffer = None + Dot11Beacon = Dot11Elt = None + + +def _parse_ble_remote_id(adv_data: bytes) -> RemoteIDObservation | None: + """Parse a BLE advertisement containing an ASTM F3411 Remote ID payload.""" + idx = adv_data.find(_REMOTE_ID_UUID_LE) + if idx < 0: + return None + payload = adv_data[idx + 2 :] + return _parse_wifi_remote_id(payload, source="BLE") + + +def _parse_wifi_remote_id(payload: bytes, source: str = "WIFI") -> RemoteIDObservation | None: + """Parse raw ASTM F3411 Location payload bytes into a RemoteIDObservation.""" + if not payload or len(payload) < 2: + return None + msg_type = payload[0] & 0x0F + if msg_type != _LOCATION_MSG_TYPE: + return None + if len(payload) < _MIN_LOCATION_PAYLOAD: + return None + try: + lat_enc, lon_enc = struct.unpack_from(" None: + self._queue = output_queue + self._sniffer = None + self._running = False + + @property + def running(self) -> bool: + return self._running + + def _on_wifi_packet(self, pkt) -> None: + if not (Dot11Beacon and pkt.haslayer(Dot11Beacon)): + return + elt = pkt.getlayer(Dot11Elt) + while elt: + if elt.ID == 221 and elt.info: + obs = _parse_wifi_remote_id(elt.info) + if obs: + with contextlib.suppress(queue.Full): + self._queue.put_nowait(obs) + elt = elt.payload if hasattr(elt, "payload") and isinstance(elt.payload, Dot11Elt) else None + + def start(self, wifi_iface: str | None = None) -> None: + self._running = True + if SCAPY_AVAILABLE and wifi_iface: + try: + self._sniffer = AsyncSniffer( + iface=wifi_iface, + filter="type mgt subtype beacon", + prn=self._on_wifi_packet, + store=False, + ) + self._sniffer.start() + logger.info("WiFi Remote ID sniffer started on %s", wifi_iface) + except Exception as exc: + logger.warning("WiFi Remote ID sniffer failed to start: %s", exc) + else: + logger.info("WiFi Remote ID unavailable (scapy=%s, iface=%s)", SCAPY_AVAILABLE, wifi_iface) + + def stop(self) -> None: + self._running = False + if self._sniffer: + with contextlib.suppress(Exception): + self._sniffer.stop() + self._sniffer = None