feat(drone): add RemoteIDScanner with BLE/WiFi ASTM F3411 parsing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-05-03 12:39:06 +01:00
parent 772b5d0973
commit a6ce5d5426
2 changed files with 214 additions and 0 deletions
+92
View File
@@ -0,0 +1,92 @@
# tests/test_drone_remote_id.py
import queue
import struct
from unittest.mock import MagicMock, patch
from utils.drone.remote_id import RemoteIDScanner, _parse_ble_remote_id, _parse_wifi_remote_id
def _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0) -> bytes:
"""Craft a minimal ASTM F3411 Location message (message type 0x01)."""
msg_type = 0x01
status = 0x00
lat_enc = int(lat * 1e7)
lon_enc = int(lon * 1e7)
alt_enc = int((alt + 1000) / 0.5)
speed_enc = int(speed / 0.25)
heading_enc = int(heading / 0.01)
return struct.pack("<BBiiHBH", msg_type, status, lat_enc, lon_enc, alt_enc, speed_enc, heading_enc)
def _make_basic_id_payload(serial="SN-TESTSERIAL") -> bytes:
msg_type = 0x00
id_type = 0x01
serial_bytes = serial.encode("ascii").ljust(20, b"\x00")[:20]
return bytes([msg_type, id_type]) + serial_bytes
def _make_ble_adv_with_remote_id(payload: bytes) -> bytes:
uuid_bytes = b"\xfa\xff"
service_data_type = 0x16
length = len(uuid_bytes) + len(payload) + 1
return bytes([length, service_data_type]) + uuid_bytes + payload
def test_parse_ble_location_returns_observation():
payload = _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0)
adv = _make_ble_adv_with_remote_id(payload)
obs = _parse_ble_remote_id(adv)
assert obs is not None
assert obs.source == "BLE"
assert abs(obs.lat - 51.5) < 0.0001
assert abs(obs.lon - (-0.1)) < 0.0001
assert abs(obs.altitude_m - 50.0) < 1.0
assert abs(obs.speed_ms - 5.0) < 0.5
def test_parse_ble_no_uuid_returns_none():
obs = _parse_ble_remote_id(b"\x00\x01\x02\x03")
assert obs is None
def test_parse_ble_too_short_returns_none():
adv = _make_ble_adv_with_remote_id(b"\x01\x00")
obs = _parse_ble_remote_id(adv)
assert obs is None
def test_parse_wifi_remote_id_returns_observation():
payload = _make_location_payload(lat=52.0, lon=0.5)
obs = _parse_wifi_remote_id(payload)
assert obs is not None
assert obs.source == "WIFI"
assert abs(obs.lat - 52.0) < 0.0001
def test_parse_wifi_non_location_returns_none():
payload = _make_basic_id_payload()
obs = _parse_wifi_remote_id(payload)
assert obs is None
def test_scanner_start_stop():
q = queue.Queue()
scanner = RemoteIDScanner(output_queue=q)
with (
patch("utils.drone.remote_id.SCAPY_AVAILABLE", True),
patch("utils.drone.remote_id.AsyncSniffer") as mock_sniffer,
):
mock_sniffer.return_value = MagicMock()
scanner.start(wifi_iface="wlan0mon")
assert scanner.running
scanner.stop()
assert not scanner.running
def test_scanner_start_without_scapy_still_works():
q = queue.Queue()
scanner = RemoteIDScanner(output_queue=q)
with patch("utils.drone.remote_id.SCAPY_AVAILABLE", False):
scanner.start(wifi_iface=None)
assert scanner.running
scanner.stop()
+122
View File
@@ -0,0 +1,122 @@
# utils/drone/remote_id.py
"""Remote ID scanner — WiFi beacon + BLE advertisement parsing (ASTM F3411)."""
from __future__ import annotations
import contextlib
import logging
import queue
import struct
from datetime import datetime, timezone
from .models import RemoteIDObservation
logger = logging.getLogger("intercept.drone.remote_id")
_REMOTE_ID_UUID_LE = b"\xfa\xff"
_LOCATION_MSG_TYPE = 0x01
_MIN_LOCATION_PAYLOAD = 15
try:
from scapy.all import AsyncSniffer, Dot11Beacon, Dot11Elt
SCAPY_AVAILABLE = True
except ImportError:
SCAPY_AVAILABLE = False
AsyncSniffer = None
Dot11Beacon = Dot11Elt = None
def _parse_ble_remote_id(adv_data: bytes) -> RemoteIDObservation | None:
"""Parse a BLE advertisement containing an ASTM F3411 Remote ID payload."""
idx = adv_data.find(_REMOTE_ID_UUID_LE)
if idx < 0:
return None
payload = adv_data[idx + 2 :]
return _parse_wifi_remote_id(payload, source="BLE")
def _parse_wifi_remote_id(payload: bytes, source: str = "WIFI") -> RemoteIDObservation | None:
"""Parse raw ASTM F3411 Location payload bytes into a RemoteIDObservation."""
if not payload or len(payload) < 2:
return None
msg_type = payload[0] & 0x0F
if msg_type != _LOCATION_MSG_TYPE:
return None
if len(payload) < _MIN_LOCATION_PAYLOAD:
return None
try:
lat_enc, lon_enc = struct.unpack_from("<ii", payload, 2)
alt_enc = struct.unpack_from("<H", payload, 10)[0]
speed_enc = struct.unpack_from("<B", payload, 12)[0]
heading_enc = struct.unpack_from("<H", payload, 13)[0]
except struct.error:
return None
lat = lat_enc * 1e-7
lon = lon_enc * 1e-7
alt = alt_enc * 0.5 - 1000.0
speed = speed_enc * 0.25
heading = heading_enc * 0.01
if not (-90.0 <= lat <= 90.0) or not (-180.0 <= lon <= 180.0):
return None
return RemoteIDObservation(
source=source,
serial_number="",
operator_id="",
lat=lat,
lon=lon,
altitude_m=alt,
speed_ms=speed,
heading=heading,
timestamp=datetime.now(timezone.utc),
)
class RemoteIDScanner:
def __init__(self, output_queue: queue.Queue) -> None:
self._queue = output_queue
self._sniffer = None
self._running = False
@property
def running(self) -> bool:
return self._running
def _on_wifi_packet(self, pkt) -> None:
if not (Dot11Beacon and pkt.haslayer(Dot11Beacon)):
return
elt = pkt.getlayer(Dot11Elt)
while elt:
if elt.ID == 221 and elt.info:
obs = _parse_wifi_remote_id(elt.info)
if obs:
with contextlib.suppress(queue.Full):
self._queue.put_nowait(obs)
elt = elt.payload if hasattr(elt, "payload") and isinstance(elt.payload, Dot11Elt) else None
def start(self, wifi_iface: str | None = None) -> None:
self._running = True
if SCAPY_AVAILABLE and wifi_iface:
try:
self._sniffer = AsyncSniffer(
iface=wifi_iface,
filter="type mgt subtype beacon",
prn=self._on_wifi_packet,
store=False,
)
self._sniffer.start()
logger.info("WiFi Remote ID sniffer started on %s", wifi_iface)
except Exception as exc:
logger.warning("WiFi Remote ID sniffer failed to start: %s", exc)
else:
logger.info("WiFi Remote ID unavailable (scapy=%s, iface=%s)", SCAPY_AVAILABLE, wifi_iface)
def stop(self) -> None:
self._running = False
if self._sniffer:
with contextlib.suppress(Exception):
self._sniffer.stop()
self._sniffer = None