mirror of
https://github.com/smittix/intercept.git
synced 2026-06-08 14:11:54 -07:00
feat(drone): add data models and RF signature table
This commit is contained in:
@@ -0,0 +1,67 @@
|
||||
# tests/test_drone_models.py
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from utils.drone.models import DroneContact, RFSignal
|
||||
from utils.drone.signatures import match_signature
|
||||
|
||||
|
||||
def _now():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def test_drone_contact_to_dict_minimal():
|
||||
c = DroneContact(id="abc123", first_seen=_now(), last_seen=_now())
|
||||
d = c.to_dict()
|
||||
assert d["id"] == "abc123"
|
||||
assert d["compliant"] is False
|
||||
assert d["risk_level"] == "low"
|
||||
assert d["detection_vectors"] == []
|
||||
assert d["position"] is None
|
||||
|
||||
|
||||
def test_drone_contact_to_dict_with_position():
|
||||
c = DroneContact(id="xyz", first_seen=_now(), last_seen=_now())
|
||||
c.position = (51.5, -0.1)
|
||||
c.serial_number = "SN001"
|
||||
c.compliant = True
|
||||
c.detection_vectors = {"REMOTE_ID_WIFI"}
|
||||
d = c.to_dict()
|
||||
assert d["position"] == [51.5, -0.1]
|
||||
assert d["serial_number"] == "SN001"
|
||||
assert d["detection_vectors"] == ["REMOTE_ID_WIFI"]
|
||||
|
||||
|
||||
def test_drone_contact_position_history_capped():
|
||||
c = DroneContact(id="cap", first_seen=_now(), last_seen=_now())
|
||||
for i in range(510):
|
||||
c.position_history.append((float(i), float(i), _now()))
|
||||
d = c.to_dict()
|
||||
# to_dict sends last 50
|
||||
assert len(d["position_history"]) == 50
|
||||
|
||||
|
||||
def test_rf_signal_fields():
|
||||
s = RFSignal(frequency_hz=433_920_000, protocol="FRSKY", rssi=-65.0, hardware="RTL433", timestamp=_now())
|
||||
assert s.frequency_hz == 433_920_000
|
||||
assert s.protocol == "FRSKY"
|
||||
|
||||
|
||||
def test_match_signature_frsky_433():
|
||||
assert match_signature(433_920_000) == "FRSKY"
|
||||
|
||||
|
||||
def test_match_signature_ocusync_24():
|
||||
assert match_signature(2_440_000_000) == "DJI_OCUSYNC"
|
||||
|
||||
|
||||
def test_match_signature_fpv_58():
|
||||
assert match_signature(5_800_000_000) == "FPV_VIDEO"
|
||||
|
||||
|
||||
def test_match_signature_ocusync_at_2450mhz():
|
||||
# 2,450 MHz is within the DJI_OCUSYNC band
|
||||
assert match_signature(2_450_000_000) == "DJI_OCUSYNC"
|
||||
|
||||
|
||||
def test_match_signature_unrecognised():
|
||||
assert match_signature(100_000_000) == "UNKNOWN"
|
||||
@@ -0,0 +1,5 @@
|
||||
"""Drone intelligence utilities — multi-vector UAV detection."""
|
||||
|
||||
from .models import DroneContact, RemoteIDObservation, RFObservation, RFSignal
|
||||
|
||||
__all__ = ["DroneContact", "RemoteIDObservation", "RFObservation", "RFSignal"]
|
||||
@@ -0,0 +1,87 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
_MAX_HISTORY_IN_DICT = 50
|
||||
_MAX_RF_IN_DICT = 10
|
||||
|
||||
|
||||
@dataclass
|
||||
class RFSignal:
|
||||
frequency_hz: int
|
||||
protocol: str
|
||||
rssi: float
|
||||
hardware: str # "RTL433" | "HACKRF"
|
||||
timestamp: datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class RemoteIDObservation:
|
||||
source: str # "WIFI" | "BLE"
|
||||
serial_number: str
|
||||
operator_id: str
|
||||
lat: float
|
||||
lon: float
|
||||
altitude_m: float
|
||||
speed_ms: float
|
||||
heading: float
|
||||
timestamp: datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class RFObservation:
|
||||
frequency_hz: int
|
||||
protocol: str
|
||||
rssi: float
|
||||
hardware: str # "RTL433" | "HACKRF"
|
||||
timestamp: datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class DroneContact:
|
||||
id: str
|
||||
first_seen: datetime
|
||||
last_seen: datetime
|
||||
serial_number: str | None = None
|
||||
operator_id: str | None = None
|
||||
position: tuple[float, float] | None = None
|
||||
altitude_m: float | None = None
|
||||
speed_ms: float | None = None
|
||||
heading: float | None = None
|
||||
position_history: list[tuple[float, float, datetime]] = field(default_factory=list)
|
||||
rf_signals: list[RFSignal] = field(default_factory=list)
|
||||
compliant: bool = False
|
||||
detection_vectors: set[str] = field(default_factory=set)
|
||||
confidence: float = 0.0
|
||||
risk_level: str = "low"
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"first_seen": self.first_seen.isoformat(),
|
||||
"last_seen": self.last_seen.isoformat(),
|
||||
"serial_number": self.serial_number,
|
||||
"operator_id": self.operator_id,
|
||||
"position": list(self.position) if self.position else None,
|
||||
"altitude_m": self.altitude_m,
|
||||
"speed_ms": self.speed_ms,
|
||||
"heading": self.heading,
|
||||
"position_history": [
|
||||
{"lat": p[0], "lon": p[1], "ts": p[2].isoformat()}
|
||||
for p in self.position_history[-_MAX_HISTORY_IN_DICT:]
|
||||
],
|
||||
"rf_signals": [
|
||||
{
|
||||
"frequency_hz": s.frequency_hz,
|
||||
"protocol": s.protocol,
|
||||
"rssi": s.rssi,
|
||||
"hardware": s.hardware,
|
||||
}
|
||||
for s in self.rf_signals[-_MAX_RF_IN_DICT:]
|
||||
],
|
||||
"compliant": self.compliant,
|
||||
"detection_vectors": sorted(self.detection_vectors),
|
||||
"confidence": round(self.confidence, 2),
|
||||
"risk_level": self.risk_level,
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
"""Drone RF protocol signature table and frequency matcher."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
_SIGNATURES = [
|
||||
{
|
||||
"name": "FRSKY",
|
||||
"freq_min_hz": 433_050_000,
|
||||
"freq_max_hz": 434_790_000,
|
||||
},
|
||||
{
|
||||
"name": "FRSKY_868",
|
||||
"freq_min_hz": 868_000_000,
|
||||
"freq_max_hz": 868_600_000,
|
||||
},
|
||||
{
|
||||
"name": "DJI_OCUSYNC",
|
||||
"freq_min_hz": 2_400_000_000,
|
||||
"freq_max_hz": 2_483_500_000,
|
||||
},
|
||||
{
|
||||
"name": "FPV_VIDEO",
|
||||
"freq_min_hz": 5_725_000_000,
|
||||
"freq_max_hz": 5_875_000_000,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def match_signature(frequency_hz: int) -> str:
|
||||
"""Return the protocol name for a detected frequency, or 'UNKNOWN'."""
|
||||
for sig in _SIGNATURES:
|
||||
if sig["freq_min_hz"] <= frequency_hz <= sig["freq_max_hz"]:
|
||||
return sig["name"]
|
||||
return "UNKNOWN"
|
||||
Reference in New Issue
Block a user