diff --git a/tests/test_drone_models.py b/tests/test_drone_models.py new file mode 100644 index 0000000..4577c7a --- /dev/null +++ b/tests/test_drone_models.py @@ -0,0 +1,67 @@ +# tests/test_drone_models.py +from datetime import datetime, timezone + +from utils.drone.models import DroneContact, RFSignal +from utils.drone.signatures import match_signature + + +def _now(): + return datetime.now(timezone.utc) + + +def test_drone_contact_to_dict_minimal(): + c = DroneContact(id="abc123", first_seen=_now(), last_seen=_now()) + d = c.to_dict() + assert d["id"] == "abc123" + assert d["compliant"] is False + assert d["risk_level"] == "low" + assert d["detection_vectors"] == [] + assert d["position"] is None + + +def test_drone_contact_to_dict_with_position(): + c = DroneContact(id="xyz", first_seen=_now(), last_seen=_now()) + c.position = (51.5, -0.1) + c.serial_number = "SN001" + c.compliant = True + c.detection_vectors = {"REMOTE_ID_WIFI"} + d = c.to_dict() + assert d["position"] == [51.5, -0.1] + assert d["serial_number"] == "SN001" + assert d["detection_vectors"] == ["REMOTE_ID_WIFI"] + + +def test_drone_contact_position_history_capped(): + c = DroneContact(id="cap", first_seen=_now(), last_seen=_now()) + for i in range(510): + c.position_history.append((float(i), float(i), _now())) + d = c.to_dict() + # to_dict sends last 50 + assert len(d["position_history"]) == 50 + + +def test_rf_signal_fields(): + s = RFSignal(frequency_hz=433_920_000, protocol="FRSKY", rssi=-65.0, hardware="RTL433", timestamp=_now()) + assert s.frequency_hz == 433_920_000 + assert s.protocol == "FRSKY" + + +def test_match_signature_frsky_433(): + assert match_signature(433_920_000) == "FRSKY" + + +def test_match_signature_ocusync_24(): + assert match_signature(2_440_000_000) == "DJI_OCUSYNC" + + +def test_match_signature_fpv_58(): + assert match_signature(5_800_000_000) == "FPV_VIDEO" + + +def test_match_signature_ocusync_at_2450mhz(): + # 2,450 MHz is within the DJI_OCUSYNC band + assert match_signature(2_450_000_000) == "DJI_OCUSYNC" + + +def test_match_signature_unrecognised(): + assert match_signature(100_000_000) == "UNKNOWN" diff --git a/utils/drone/__init__.py b/utils/drone/__init__.py new file mode 100644 index 0000000..60b3f3a --- /dev/null +++ b/utils/drone/__init__.py @@ -0,0 +1,5 @@ +"""Drone intelligence utilities — multi-vector UAV detection.""" + +from .models import DroneContact, RemoteIDObservation, RFObservation, RFSignal + +__all__ = ["DroneContact", "RemoteIDObservation", "RFObservation", "RFSignal"] diff --git a/utils/drone/models.py b/utils/drone/models.py new file mode 100644 index 0000000..2f9614b --- /dev/null +++ b/utils/drone/models.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime + +_MAX_HISTORY_IN_DICT = 50 +_MAX_RF_IN_DICT = 10 + + +@dataclass +class RFSignal: + frequency_hz: int + protocol: str + rssi: float + hardware: str # "RTL433" | "HACKRF" + timestamp: datetime + + +@dataclass +class RemoteIDObservation: + source: str # "WIFI" | "BLE" + serial_number: str + operator_id: str + lat: float + lon: float + altitude_m: float + speed_ms: float + heading: float + timestamp: datetime + + +@dataclass +class RFObservation: + frequency_hz: int + protocol: str + rssi: float + hardware: str # "RTL433" | "HACKRF" + timestamp: datetime + + +@dataclass +class DroneContact: + id: str + first_seen: datetime + last_seen: datetime + serial_number: str | None = None + operator_id: str | None = None + position: tuple[float, float] | None = None + altitude_m: float | None = None + speed_ms: float | None = None + heading: float | None = None + position_history: list[tuple[float, float, datetime]] = field(default_factory=list) + rf_signals: list[RFSignal] = field(default_factory=list) + compliant: bool = False + detection_vectors: set[str] = field(default_factory=set) + confidence: float = 0.0 + risk_level: str = "low" + + def to_dict(self) -> dict: + return { + "id": self.id, + "first_seen": self.first_seen.isoformat(), + "last_seen": self.last_seen.isoformat(), + "serial_number": self.serial_number, + "operator_id": self.operator_id, + "position": list(self.position) if self.position else None, + "altitude_m": self.altitude_m, + "speed_ms": self.speed_ms, + "heading": self.heading, + "position_history": [ + {"lat": p[0], "lon": p[1], "ts": p[2].isoformat()} + for p in self.position_history[-_MAX_HISTORY_IN_DICT:] + ], + "rf_signals": [ + { + "frequency_hz": s.frequency_hz, + "protocol": s.protocol, + "rssi": s.rssi, + "hardware": s.hardware, + } + for s in self.rf_signals[-_MAX_RF_IN_DICT:] + ], + "compliant": self.compliant, + "detection_vectors": sorted(self.detection_vectors), + "confidence": round(self.confidence, 2), + "risk_level": self.risk_level, + } diff --git a/utils/drone/signatures.py b/utils/drone/signatures.py new file mode 100644 index 0000000..f9f2743 --- /dev/null +++ b/utils/drone/signatures.py @@ -0,0 +1,34 @@ +"""Drone RF protocol signature table and frequency matcher.""" + +from __future__ import annotations + +_SIGNATURES = [ + { + "name": "FRSKY", + "freq_min_hz": 433_050_000, + "freq_max_hz": 434_790_000, + }, + { + "name": "FRSKY_868", + "freq_min_hz": 868_000_000, + "freq_max_hz": 868_600_000, + }, + { + "name": "DJI_OCUSYNC", + "freq_min_hz": 2_400_000_000, + "freq_max_hz": 2_483_500_000, + }, + { + "name": "FPV_VIDEO", + "freq_min_hz": 5_725_000_000, + "freq_max_hz": 5_875_000_000, + }, +] + + +def match_signature(frequency_hz: int) -> str: + """Return the protocol name for a detected frequency, or 'UNKNOWN'.""" + for sig in _SIGNATURES: + if sig["freq_min_hz"] <= frequency_hz <= sig["freq_max_hz"]: + return sig["name"] + return "UNKNOWN"