feat(drone): merge Drone Intelligence module

Multi-vector UAV detection mode: Remote ID (WiFi/BLE ASTM F3411),
RTL-SDR 433/868MHz control-link detection, HackRF 2.4/5.8GHz wideband.

Workers feed a shared observation queue; DroneCorrelator merges into
DroneContact objects with TTL store, risk scoring, and SSE streaming.
Frontend: two-panel sidebar + Leaflet map with contact cards and trails.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-05-05 08:36:27 +01:00
19 changed files with 1438 additions and 3 deletions
+3
View File
@@ -317,6 +317,9 @@ deauth_detector = None
deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
deauth_detector_lock = threading.Lock()
# Drone Intelligence
drone_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
# ============================================
# GLOBAL STATE DICTIONARIES
# ============================================
+1
View File
@@ -30,6 +30,7 @@ meshtastic>=2.0.0
# Deauthentication attack detection (optional - for WiFi TSCM)
scapy>=2.4.5
opendroneid>=1.0
# QR code generation for Meshtastic channels (optional)
qrcode[pil]>=7.4
+4 -1
View File
@@ -18,6 +18,7 @@ def register_blueprints(app):
from .bt_locate import bt_locate_bp
from .controller import controller_bp
from .correlation import correlation_bp
from .drone import drone_bp
from .dsc import dsc_bp
from .gps import gps_bp
from .ground_station import ground_station_bp
@@ -91,6 +92,7 @@ def register_blueprints(app):
app.register_blueprint(system_bp) # System health monitoring
app.register_blueprint(ook_bp) # Generic OOK signal decoder
app.register_blueprint(ground_station_bp) # Ground station automation
app.register_blueprint(drone_bp) # Drone intelligence / UAV detection
# Exempt all API blueprints from CSRF (they use JSON, not form tokens)
if _csrf:
@@ -99,5 +101,6 @@ def register_blueprints(app):
# Initialize TSCM state with queue and lock from app
import app as app_module
if hasattr(app_module, 'tscm_queue') and hasattr(app_module, 'tscm_lock'):
if hasattr(app_module, "tscm_queue") and hasattr(app_module, "tscm_lock"):
init_tscm_state(app_module.tscm_queue, app_module.tscm_lock)
+132
View File
@@ -0,0 +1,132 @@
"""Drone intelligence routes — multi-vector UAV detection."""
from __future__ import annotations
import logging
import queue
import threading
from flask import Blueprint, Response, jsonify, request
import app as app_module
from utils.constants import SSE_KEEPALIVE_INTERVAL, SSE_QUEUE_TIMEOUT
from utils.drone.correlator import DroneCorrelator
from utils.drone.remote_id import RemoteIDScanner
from utils.drone.rf_detector import RFDetector
from utils.sse import sse_stream_fanout
from utils.validation import validate_device_index
logger = logging.getLogger("intercept.drone")
drone_bp = Blueprint("drone", __name__, url_prefix="/drone")
_correlator: DroneCorrelator | None = None
_remote_id_scanner: RemoteIDScanner | None = None
_rf_detector: RFDetector | None = None
_obs_queue: queue.Queue | None = None # raw observations from scanners/detectors
_relay_thread: threading.Thread | None = None
_drone_running = False
_drone_lock = threading.Lock()
_SENTINEL = object()
def _relay_observations() -> None:
"""Read raw observations from _obs_queue and feed them into the correlator."""
while True:
obs = _obs_queue.get()
if obs is _SENTINEL:
break
if _correlator is not None:
_correlator.process(obs)
def _ensure_workers() -> None:
global _correlator, _remote_id_scanner, _rf_detector, _obs_queue, _relay_thread
if _obs_queue is None:
_obs_queue = queue.Queue(maxsize=512)
if _correlator is None:
_correlator = DroneCorrelator(output_queue=app_module.drone_queue)
if _remote_id_scanner is None:
_remote_id_scanner = RemoteIDScanner(output_queue=_obs_queue)
if _rf_detector is None:
_rf_detector = RFDetector(output_queue=_obs_queue)
if _relay_thread is None or not _relay_thread.is_alive():
_relay_thread = threading.Thread(target=_relay_observations, daemon=True)
_relay_thread.start()
@drone_bp.route("/status")
def status():
vectors = []
if _remote_id_scanner and _remote_id_scanner.running:
vectors.append("REMOTE_ID")
if _rf_detector and _rf_detector.running:
vectors.append("RF")
return jsonify(
{
"running": _drone_running,
"vectors": vectors,
"contact_count": len(_correlator.get_all()) if _correlator else 0,
}
)
@drone_bp.route("/contacts")
def contacts():
if not _correlator:
return jsonify([])
return jsonify(_correlator.get_all())
@drone_bp.route("/start", methods=["POST"])
def start():
global _drone_running
body = request.json or {}
wifi_iface = body.get("wifi_iface") or None
try:
rtl_index = validate_device_index(body.get("rtl_sdr_index", 0))
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
use_hackrf = bool(body.get("use_hackrf", True))
with _drone_lock:
_ensure_workers()
if not _drone_running:
if _remote_id_scanner:
_remote_id_scanner.start(wifi_iface=wifi_iface)
if _rf_detector:
_rf_detector.start(rtl_sdr_index=rtl_index, use_hackrf=use_hackrf)
_drone_running = True
logger.info("Drone detection started")
return jsonify({"status": "ok", "running": True})
@drone_bp.route("/stop", methods=["POST"])
def stop():
global _drone_running
with _drone_lock:
if _remote_id_scanner:
_remote_id_scanner.stop()
if _rf_detector:
_rf_detector.stop()
if _obs_queue is not None:
_obs_queue.put_nowait(_SENTINEL)
_drone_running = False
logger.info("Drone detection stopped")
return jsonify({"status": "ok", "running": False})
@drone_bp.route("/stream")
def stream():
return Response(
sse_stream_fanout(
source_queue=app_module.drone_queue,
channel_key="drone",
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
+86
View File
@@ -0,0 +1,86 @@
/* Drone Intelligence Styles */
.drone-vector-pills {
display: flex;
flex-wrap: wrap;
gap: 6px;
margin-top: 4px;
}
.drone-vector-pill {
font-size: 10px;
font-family: var(--font-mono);
padding: 3px 8px;
border-radius: 3px;
background: var(--bg-primary);
color: var(--text-dim);
border: 1px solid var(--border-color);
transition: background 0.2s, color 0.2s;
}
.drone-vector-pill.active {
background: color-mix(in srgb, var(--accent-cyan) 15%, transparent);
color: var(--accent-cyan);
border-color: var(--accent-cyan);
}
.drone-contact-card {
background: var(--bg-card);
border: 1px solid var(--border-color);
border-radius: 4px;
padding: 10px 12px;
margin-bottom: 8px;
cursor: pointer;
transition: border-color 0.15s;
}
.drone-contact-card:hover {
border-color: var(--accent-cyan);
}
.drone-contact-card.high-risk {
border-left: 3px solid var(--accent-red);
}
.drone-contact-card.medium-risk {
border-left: 3px solid var(--accent-yellow);
}
.drone-contact-card.low-risk {
border-left: 3px solid var(--accent-green);
}
.drone-compliance-badge {
font-size: 9px;
font-family: var(--font-mono);
padding: 2px 6px;
border-radius: 2px;
font-weight: 600;
text-transform: uppercase;
}
.drone-compliance-badge.compliant {
background: color-mix(in srgb, var(--accent-green) 20%, transparent);
color: var(--accent-green);
}
.drone-compliance-badge.non-compliant {
background: color-mix(in srgb, var(--accent-red) 20%, transparent);
color: var(--accent-red);
}
.drone-map {
height: 280px;
border-radius: 4px;
border: 1px solid var(--border-color);
margin: 0 12px 12px;
}
.drone-marker-high-risk {
animation: dsc-distress-pulse 1.5s infinite;
}
@keyframes dsc-distress-pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.4; transform: scale(1.4); }
}
+203
View File
@@ -0,0 +1,203 @@
(function DroneMode() {
'use strict';
let _sse = null;
let _map = null;
let _markers = {};
let _trails = {};
let _running = false;
function init() {
document.getElementById('droneStartBtn')?.addEventListener('click', _start);
document.getElementById('droneStopBtn')?.addEventListener('click', _stop);
_initMap();
_connectSSE();
_refreshStatus();
}
function _initMap() {
if (_map) return;
const mapEl = document.getElementById('droneMap');
if (!mapEl || typeof L === 'undefined') return;
_map = L.map('droneMap', { zoomControl: true }).setView([20, 0], 2);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
attribution: '© OpenStreetMap',
maxZoom: 18,
}).addTo(_map);
}
function destroy() {
_disconnectSSE();
if (_map) {
_map.remove();
_map = null;
}
_markers = {};
_trails = {};
}
function _connectSSE() {
if (_sse) return;
_sse = new EventSource('/drone/stream');
_sse.addEventListener('message', function (e) {
try {
const msg = JSON.parse(e.data);
if (msg.type === 'contact') _handleContact(msg.data);
} catch (_) {}
});
_sse.onerror = function () {
_sse.close();
_sse = null;
setTimeout(_connectSSE, 3000);
};
}
function _disconnectSSE() {
if (_sse) { _sse.close(); _sse = null; }
}
function _handleContact(contact) {
_upsertCard(contact);
if (contact.position) _upsertMapMarker(contact);
_updateStats();
}
function _upsertCard(contact) {
const listEl = document.getElementById('droneContactList');
if (!listEl) return;
let card = document.getElementById('drone-card-' + contact.id);
if (!card) {
card = document.createElement('div');
card.id = 'drone-card-' + contact.id;
card.className = 'drone-contact-card';
card.addEventListener('click', function () { _focusContact(contact.id); });
listEl.prepend(card);
}
card.className = 'drone-contact-card ' + contact.risk_level + '-risk';
const complianceLabel = contact.compliant
? '<span class="drone-compliance-badge compliant">Remote ID</span>'
: '<span class="drone-compliance-badge non-compliant">No Remote ID</span>';
const vectors = (contact.detection_vectors || []).map(function (v) {
return '<span class="drone-vector-pill active">' + v + '</span>';
}).join('');
const alt = contact.altitude_m != null ? contact.altitude_m.toFixed(0) + 'm' : '—';
const spd = contact.speed_ms != null ? contact.speed_ms.toFixed(1) + 'm/s' : '—';
card.innerHTML = [
'<div style="display:flex; justify-content:space-between; align-items:center; margin-bottom:6px;">',
' <span style="font-family:var(--font-mono); font-size:11px; color:var(--accent-cyan);">' + (contact.serial_number || contact.id) + '</span>',
' ' + complianceLabel,
'</div>',
'<div class="drone-vector-pills" style="margin-bottom:6px;">' + vectors + '</div>',
'<div style="font-size:10px; color:var(--text-dim);">Alt: ' + alt + ' &nbsp; Speed: ' + spd + '</div>',
].join('');
}
function _upsertMapMarker(contact) {
if (!_map) return;
const lat = contact.position[0];
const lon = contact.position[1];
if (_markers[contact.id]) {
_markers[contact.id].setLatLng([lat, lon]);
} else {
const color = contact.risk_level === 'high' ? 'var(--accent-red)' :
contact.risk_level === 'medium' ? 'var(--accent-yellow)' :
'var(--accent-cyan)';
const icon = L.divIcon({
className: 'drone-map-icon' + (contact.risk_level === 'high' ? ' drone-marker-high-risk' : ''),
html: '<div style="width:10px;height:10px;border-radius:50%;background:' + color + ';border:2px solid #fff;"></div>',
iconSize: [10, 10],
iconAnchor: [5, 5],
});
_markers[contact.id] = L.marker([lat, lon], { icon: icon })
.addTo(_map)
.bindPopup('<b>' + (contact.serial_number || contact.id) + '</b><br>Risk: ' + contact.risk_level);
}
const trailPoints = (contact.position_history || []).map(function (p) {
return [p.lat, p.lon];
});
if (_trails[contact.id]) {
_trails[contact.id].setLatLngs(trailPoints);
} else if (trailPoints.length > 1) {
_trails[contact.id] = L.polyline(trailPoints, {
color: contact.risk_level === 'high' ? '#ff4444' : '#00ccff',
weight: 1.5,
opacity: 0.6,
}).addTo(_map);
}
}
function _focusContact(contactId) {
if (_map && _markers[contactId]) {
_map.panTo(_markers[contactId].getLatLng());
_markers[contactId].openPopup();
}
}
function _updateStats() {
fetch('/drone/contacts')
.then(function (r) { return r.json(); })
.then(function (contacts) {
const nonCompliant = contacts.filter(function (c) { return !c.compliant; }).length;
const countEl = document.getElementById('droneContactCount');
const ncEl = document.getElementById('droneNonCompliantCount');
if (countEl) countEl.textContent = contacts.length;
if (ncEl) ncEl.textContent = nonCompliant;
})
.catch(function () {});
}
function _refreshStatus() {
fetch('/drone/status')
.then(function (r) { return r.json(); })
.then(function (data) {
_running = data.running;
_setRunningUI(data.running);
_updateVectorPills(data.vectors || []);
})
.catch(function () {});
}
function _start() {
const iface = document.getElementById('droneWifiIface')?.value.trim() || null;
fetch('/drone/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ wifi_iface: iface }),
})
.then(function (r) { return r.json(); })
.then(function () { _setRunningUI(true); _refreshStatus(); })
.catch(function () {});
}
function _stop() {
fetch('/drone/stop', { method: 'POST' })
.then(function () { _setRunningUI(false); _refreshStatus(); })
.catch(function () {});
}
function _setRunningUI(running) {
const startBtn = document.getElementById('droneStartBtn');
const stopBtn = document.getElementById('droneStopBtn');
const statusEl = document.getElementById('droneStatusText');
if (startBtn) startBtn.disabled = running;
if (stopBtn) stopBtn.disabled = !running;
if (statusEl) {
statusEl.textContent = running ? 'Active' : 'Standby';
statusEl.style.color = running ? 'var(--accent-green)' : 'var(--accent-yellow)';
}
}
function _updateVectorPills(activeVectors) {
const pillMap = {
'REMOTE_ID': 'dronePillRemoteId',
'RTL433': 'dronePill433',
'HACKRF': 'dronePillHackrf',
};
Object.entries(pillMap).forEach(function ([key, id]) {
const el = document.getElementById(id);
if (el) el.classList.toggle('active', activeVectors.some(function (v) { return v.includes(key); }));
});
}
window.DroneMode = { init: init, destroy: destroy };
})();
+11 -2
View File
@@ -102,7 +102,8 @@
radiosonde: "{{ url_for('static', filename='css/modes/radiosonde.css') }}",
meteor: "{{ url_for('static', filename='css/modes/meteor.css') }}",
system: "{{ url_for('static', filename='css/modes/system.css') }}",
ook: "{{ url_for('static', filename='css/modes/ook.css') }}"
ook: "{{ url_for('static', filename='css/modes/ook.css') }}",
drone: "{{ url_for('static', filename='css/modes/drone.css') }}"
};
window.INTERCEPT_MODE_STYLE_LOADED = {};
window.INTERCEPT_MODE_STYLE_PROMISES = {};
@@ -186,7 +187,8 @@
spaceweather: "{{ url_for('static', filename='js/modes/space-weather.js') }}",
system: "{{ url_for('static', filename='js/modes/system.js') }}",
meteor: "{{ url_for('static', filename='js/modes/meteor.js') }}",
waterfall: "{{ url_for('static', filename='js/modes/waterfall.js') }}?v={{ version }}&r=wfdeck21"
waterfall: "{{ url_for('static', filename='js/modes/waterfall.js') }}?v={{ version }}&r=wfdeck21",
drone: "{{ url_for('static', filename='js/modes/drone.js') }}"
};
window.INTERCEPT_MODE_SCRIPT_LOADED = {};
window.INTERCEPT_MODE_SCRIPT_PROMISES = {};
@@ -764,6 +766,8 @@
{% include 'partials/modes/ais.html' %}
{% include 'partials/modes/drone.html' %}
{% include 'partials/modes/radiosonde.html' %}
{% include 'partials/modes/spy-stations.html' %}
@@ -3767,6 +3771,7 @@
wifi_locate: { label: 'WiFi Locate', indicator: 'WF LOCATE', outputTitle: 'WiFi Locate', group: 'wireless' },
meshtastic: { label: 'Meshtastic', indicator: 'MESHTASTIC', outputTitle: 'Meshtastic Mesh Monitor', group: 'wireless' },
tscm: { label: 'TSCM', indicator: 'TSCM', outputTitle: 'TSCM Counter-Surveillance', group: 'intel' },
drone: { label: 'Drone Intel', indicator: 'DRONE', outputTitle: 'Drone Intelligence', group: 'intel' },
spystations: { label: 'Spy Stations', indicator: 'SPY STATIONS', outputTitle: 'Spy Stations', group: 'intel' },
websdr: { label: 'WebSDR', indicator: 'WEBSDR', outputTitle: 'HF/Shortwave WebSDR', group: 'intel' },
waterfall: { label: 'Waterfall', indicator: 'WATERFALL', outputTitle: 'Spectrum Waterfall', group: 'signals' },
@@ -4403,6 +4408,7 @@
tscm: () => { if (tscmEventSource) { tscmEventSource.close(); tscmEventSource = null; } },
meteor: () => typeof MeteorScatter !== 'undefined' && MeteorScatter.destroy?.(),
ook: () => typeof OokMode !== 'undefined' && OokMode.destroy?.(),
drone: () => typeof DroneMode !== 'undefined' && DroneMode.destroy?.(),
};
return moduleDestroyMap[mode] || null;
}
@@ -4713,6 +4719,7 @@
document.getElementById('aprsMode')?.classList.toggle('active', mode === 'aprs');
document.getElementById('tscmMode')?.classList.toggle('active', mode === 'tscm');
document.getElementById('aisMode')?.classList.toggle('active', mode === 'ais');
document.getElementById('droneMode')?.classList.toggle('active', mode === 'drone');
document.getElementById('radiosondeMode')?.classList.toggle('active', mode === 'radiosonde');
document.getElementById('spystationsMode')?.classList.toggle('active', mode === 'spystations');
document.getElementById('meshtasticMode')?.classList.toggle('active', mode === 'meshtastic');
@@ -5011,6 +5018,8 @@
SystemHealth.init();
} else if (mode === 'ook') {
OokMode.init();
} else if (mode === 'drone') {
if (typeof DroneMode !== 'undefined') DroneMode.init();
}
if (requestId !== modeSwitchRequestId) return;
+49
View File
@@ -0,0 +1,49 @@
<!-- DRONE INTELLIGENCE MODE -->
<div id="droneMode" class="mode-content" style="display: none;">
<div class="section">
<h3>Drone Intelligence</h3>
<p class="info-text" style="margin-bottom: 12px;">
Multi-vector UAV detection: Remote ID (WiFi/BLE), 433/868&nbsp;MHz control links, 2.4/5.8&nbsp;GHz wideband.
</p>
</div>
<div class="section">
<h3>Detection Vectors</h3>
<div id="droneVectorStatus" class="drone-vector-pills">
<span class="drone-vector-pill" id="dronePillRemoteId">Remote ID</span>
<span class="drone-vector-pill" id="dronePill433">433 MHz</span>
<span class="drone-vector-pill" id="dronePillHackrf">2.4 / 5.8 GHz</span>
</div>
</div>
<div class="section">
<h3>WiFi Interface <span style="font-weight:400; font-size:11px; color:var(--text-dim)">(monitor mode)</span></h3>
<input type="text" id="droneWifiIface" placeholder="e.g. wlan0mon" style="width:100%;">
</div>
<div class="section">
<div style="display:flex; gap:8px;">
<button id="droneStartBtn" class="run-btn" style="flex:1;">Start</button>
<button id="droneStopBtn" class="stop-btn" style="flex:1;" disabled>Stop</button>
</div>
</div>
<div class="section">
<h3>Status</h3>
<p class="info-text">
Status: <span id="droneStatusText" style="color:var(--accent-yellow);">Standby</span>
</p>
<p class="info-text">
Contacts: <span id="droneContactCount">0</span>
&nbsp;|&nbsp;
Non-compliant: <span id="droneNonCompliantCount" style="color:var(--accent-red);">0</span>
</p>
</div>
<div class="section">
<h3>Detected Contacts</h3>
<div id="droneContactList"></div>
</div>
<div id="droneMap" class="drone-map"></div>
</div>
+134
View File
@@ -0,0 +1,134 @@
# tests/test_drone_correlator.py
import queue
import time
from datetime import datetime, timezone
import pytest
from utils.drone.correlator import DroneCorrelator
from utils.drone.models import RemoteIDObservation, RFObservation
def _now():
return datetime.now(timezone.utc)
def _remote_id_obs(serial="SN001", lat=51.5, lon=-0.1):
return RemoteIDObservation(
source="WIFI",
serial_number=serial,
operator_id="OP001",
lat=lat,
lon=lon,
altitude_m=50.0,
speed_ms=5.0,
heading=90.0,
timestamp=_now(),
)
def _rf_obs(freq=433_920_000, proto="FRSKY", rssi=-70.0):
return RFObservation(
frequency_hz=freq,
protocol=proto,
rssi=rssi,
hardware="RTL433",
timestamp=_now(),
)
@pytest.fixture
def correlator():
q = queue.Queue()
return DroneCorrelator(output_queue=q), q
def test_remote_id_creates_contact(correlator):
corr, q = correlator
corr.process(_remote_id_obs())
contacts = corr.get_all()
assert len(contacts) == 1
assert contacts[0]["compliant"] is True
assert contacts[0]["serial_number"] == "SN001"
assert contacts[0]["position"] == [51.5, -0.1]
def test_rf_creates_contact(correlator):
corr, q = correlator
corr.process(_rf_obs())
contacts = corr.get_all()
assert len(contacts) == 1
assert contacts[0]["compliant"] is False
def test_remote_id_emits_sse_event(correlator):
corr, q = correlator
corr.process(_remote_id_obs())
msg = q.get_nowait()
assert msg["type"] == "contact"
assert msg["data"]["serial_number"] == "SN001"
def test_same_serial_updates_contact(correlator):
corr, q = correlator
corr.process(_remote_id_obs(lat=51.5, lon=-0.1))
corr.process(_remote_id_obs(lat=51.6, lon=-0.2))
contacts = corr.get_all()
assert len(contacts) == 1
assert contacts[0]["position"] == [51.6, -0.2]
def test_different_serials_create_separate_contacts(correlator):
corr, q = correlator
corr.process(_remote_id_obs(serial="SN001"))
corr.process(_remote_id_obs(serial="SN002"))
contacts = corr.get_all()
assert len(contacts) == 2
def test_position_history_grows(correlator):
corr, q = correlator
for i in range(5):
corr.process(_remote_id_obs(lat=51.0 + i * 0.01, lon=-0.1))
contacts = corr.get_all()
assert len(contacts[0]["position_history"]) == 5
def test_position_history_capped_at_500(correlator):
corr, q = correlator
for i in range(510):
corr.process(_remote_id_obs(lat=float(i), lon=0.0))
store_values = list(corr._store.values())
assert len(store_values[0].position_history) == 500
def test_compliant_single_vector_is_low_risk(correlator):
corr, q = correlator
corr.process(_remote_id_obs())
contacts = corr.get_all()
assert contacts[0]["risk_level"] == "low"
def test_non_compliant_is_high_risk(correlator):
corr, q = correlator
corr.process(_rf_obs())
contacts = corr.get_all()
assert contacts[0]["risk_level"] == "high"
def test_confidence_increases_with_vectors(correlator):
corr, q = correlator
corr.process(_remote_id_obs())
contacts = {c["id"]: c for c in corr.get_all()}
rid_contact = next(c for c in contacts.values() if c["compliant"])
assert rid_contact["confidence"] == 0.25 # 1/4
def test_ttl_expiry_removes_contact(correlator):
corr, q = correlator
corr.process(_remote_id_obs())
assert len(corr.get_all()) == 1
for key in corr._store.timestamps:
corr._store.timestamps[key] = time.time() - 300
corr._store.cleanup()
assert len(corr.get_all()) == 0
+67
View File
@@ -0,0 +1,67 @@
# tests/test_drone_models.py
from datetime import datetime, timezone
from utils.drone.models import DroneContact, RFSignal
from utils.drone.signatures import match_signature
def _now():
return datetime.now(timezone.utc)
def test_drone_contact_to_dict_minimal():
c = DroneContact(id="abc123", first_seen=_now(), last_seen=_now())
d = c.to_dict()
assert d["id"] == "abc123"
assert d["compliant"] is False
assert d["risk_level"] == "low"
assert d["detection_vectors"] == []
assert d["position"] is None
def test_drone_contact_to_dict_with_position():
c = DroneContact(id="xyz", first_seen=_now(), last_seen=_now())
c.position = (51.5, -0.1)
c.serial_number = "SN001"
c.compliant = True
c.detection_vectors = {"REMOTE_ID_WIFI"}
d = c.to_dict()
assert d["position"] == [51.5, -0.1]
assert d["serial_number"] == "SN001"
assert d["detection_vectors"] == ["REMOTE_ID_WIFI"]
def test_drone_contact_position_history_capped():
c = DroneContact(id="cap", first_seen=_now(), last_seen=_now())
for i in range(510):
c.position_history.append((float(i), float(i), _now()))
d = c.to_dict()
# to_dict sends last 50
assert len(d["position_history"]) == 50
def test_rf_signal_fields():
s = RFSignal(frequency_hz=433_920_000, protocol="FRSKY", rssi=-65.0, hardware="RTL433", timestamp=_now())
assert s.frequency_hz == 433_920_000
assert s.protocol == "FRSKY"
def test_match_signature_frsky_433():
assert match_signature(433_920_000) == "FRSKY"
def test_match_signature_ocusync_24():
assert match_signature(2_440_000_000) == "DJI_OCUSYNC"
def test_match_signature_fpv_58():
assert match_signature(5_800_000_000) == "FPV_VIDEO"
def test_match_signature_ocusync_at_2450mhz():
# 2,450 MHz is within the DJI_OCUSYNC band
assert match_signature(2_450_000_000) == "DJI_OCUSYNC"
def test_match_signature_unrecognised():
assert match_signature(100_000_000) == "UNKNOWN"
+92
View File
@@ -0,0 +1,92 @@
# tests/test_drone_remote_id.py
import queue
import struct
from unittest.mock import MagicMock, patch
from utils.drone.remote_id import RemoteIDScanner, _parse_ble_remote_id, _parse_wifi_remote_id
def _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0) -> bytes:
"""Craft a minimal ASTM F3411 Location message (message type 0x01)."""
msg_type = 0x01
status = 0x00
lat_enc = int(lat * 1e7)
lon_enc = int(lon * 1e7)
alt_enc = int((alt + 1000) / 0.5)
speed_enc = int(speed / 0.25)
heading_enc = int(heading / 0.01)
return struct.pack("<BBiiHBH", msg_type, status, lat_enc, lon_enc, alt_enc, speed_enc, heading_enc)
def _make_basic_id_payload(serial="SN-TESTSERIAL") -> bytes:
msg_type = 0x00
id_type = 0x01
serial_bytes = serial.encode("ascii").ljust(20, b"\x00")[:20]
return bytes([msg_type, id_type]) + serial_bytes
def _make_ble_adv_with_remote_id(payload: bytes) -> bytes:
uuid_bytes = b"\xfa\xff"
service_data_type = 0x16
length = len(uuid_bytes) + len(payload) + 1
return bytes([length, service_data_type]) + uuid_bytes + payload
def test_parse_ble_location_returns_observation():
payload = _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0)
adv = _make_ble_adv_with_remote_id(payload)
obs = _parse_ble_remote_id(adv)
assert obs is not None
assert obs.source == "BLE"
assert abs(obs.lat - 51.5) < 0.0001
assert abs(obs.lon - (-0.1)) < 0.0001
assert abs(obs.altitude_m - 50.0) < 1.0
assert abs(obs.speed_ms - 5.0) < 0.5
def test_parse_ble_no_uuid_returns_none():
obs = _parse_ble_remote_id(b"\x00\x01\x02\x03")
assert obs is None
def test_parse_ble_too_short_returns_none():
adv = _make_ble_adv_with_remote_id(b"\x01\x00")
obs = _parse_ble_remote_id(adv)
assert obs is None
def test_parse_wifi_remote_id_returns_observation():
payload = _make_location_payload(lat=52.0, lon=0.5)
obs = _parse_wifi_remote_id(payload)
assert obs is not None
assert obs.source == "WIFI"
assert abs(obs.lat - 52.0) < 0.0001
def test_parse_wifi_non_location_returns_none():
payload = _make_basic_id_payload()
obs = _parse_wifi_remote_id(payload)
assert obs is None
def test_scanner_start_stop():
q = queue.Queue()
scanner = RemoteIDScanner(output_queue=q)
with (
patch("utils.drone.remote_id.SCAPY_AVAILABLE", True),
patch("utils.drone.remote_id.AsyncSniffer") as mock_sniffer,
):
mock_sniffer.return_value = MagicMock()
scanner.start(wifi_iface="wlan0mon")
assert scanner.running
scanner.stop()
assert not scanner.running
def test_scanner_start_without_scapy_still_works():
q = queue.Queue()
scanner = RemoteIDScanner(output_queue=q)
with patch("utils.drone.remote_id.SCAPY_AVAILABLE", False):
scanner.start(wifi_iface=None)
assert scanner.running
scanner.stop()
+94
View File
@@ -0,0 +1,94 @@
"""Tests for RFDetector (rtl_433 + hackrf_sweep control-link detection)."""
from __future__ import annotations
import json
import queue
from unittest.mock import MagicMock, patch
import pytest
from utils.drone.models import RFObservation
from utils.drone.rf_detector import RFDetector
@pytest.fixture
def detector():
q = queue.Queue()
return RFDetector(output_queue=q), q
def test_detector_not_running_initially(detector):
det, q = detector
assert not det.running
def test_rtl433_json_line_emits_observation(detector):
det, q = detector
rtl433_line = json.dumps(
{
"freq": 433920000,
"rssi": -68.5,
"protocol": "FrSky",
}
)
det._handle_rtl433_line(rtl433_line)
obs = q.get_nowait()
assert isinstance(obs, RFObservation)
assert obs.frequency_hz == 433_920_000
assert obs.hardware == "RTL433"
assert obs.rssi == -68.5
def test_rtl433_non_json_line_ignored(detector):
det, q = detector
det._handle_rtl433_line("not json at all")
assert q.empty()
def test_hackrf_sweep_line_emits_observation(detector):
det, q = detector
# hackrf_sweep CSV: date, time, hz_low, hz_high, hz_bin_width, num_samples, db, db, ...
hz_low = 2_440_000_000
hz_high = 2_441_000_000
sweep_line = f"2026-05-03, 12:00:00, {hz_low}, {hz_high}, 1000000, 10, -45.2, -46.1, -44.8"
det._handle_hackrf_line(sweep_line)
obs = q.get_nowait()
assert isinstance(obs, RFObservation)
assert obs.hardware == "HACKRF"
assert obs.frequency_hz == (hz_low + hz_high) // 2
assert obs.rssi < 0
def test_hackrf_sweep_below_threshold_ignored(detector):
det, q = detector
hz_low = 2_440_000_000
hz_high = 2_441_000_000
# Very low power — should be ignored (below -90 dBm threshold)
sweep_line = f"2026-05-03, 12:00:00, {hz_low}, {hz_high}, 1000000, 10, -95.0, -96.0, -95.5"
det._handle_hackrf_line(sweep_line)
assert q.empty()
def test_out_of_band_frequency_ignored(detector):
det, q = detector
# 915 MHz is not in any drone band
line = json.dumps({"freq": 915_000_000, "rssi": -50.0, "protocol": "Generic"})
det._handle_rtl433_line(line)
assert q.empty()
def test_start_stop(detector):
det, q = detector
mock_proc = MagicMock()
mock_proc.stdout = MagicMock()
mock_proc.stdout.readline = MagicMock(side_effect=[b""])
# Patch both shutil.which calls (rtl_433 in _run_rtl433, hackrf_sweep in _run_hackrf)
with (
patch("subprocess.Popen", return_value=mock_proc),
patch("utils.drone.rf_detector.shutil.which", return_value=None),
):
det.start(rtl_sdr_index=0, use_hackrf=False)
assert det.running
det.stop()
assert not det.running
+63
View File
@@ -0,0 +1,63 @@
import json
import queue
from unittest.mock import patch
import pytest
from flask import Flask
import app as app_module
from routes.drone import drone_bp
@pytest.fixture(autouse=True)
def mock_app_state(mocker):
mocker.patch.object(app_module, "drone_queue", queue.Queue())
yield
@pytest.fixture
def drone_app():
app = Flask(__name__)
app.register_blueprint(drone_bp)
app.config["TESTING"] = True
return app
@pytest.fixture
def client(drone_app):
return drone_app.test_client()
def test_status_returns_json(client):
resp = client.get("/drone/status")
assert resp.status_code == 200
data = json.loads(resp.data)
assert "running" in data
assert "vectors" in data
def test_contacts_returns_empty_list_when_idle(client):
resp = client.get("/drone/contacts")
assert resp.status_code == 200
data = json.loads(resp.data)
assert data == [] or isinstance(data, list)
def test_start_returns_ok(client):
with (
patch("routes.drone._correlator"),
patch("routes.drone._remote_id_scanner"),
patch("routes.drone._rf_detector"),
):
resp = client.post("/drone/start", json={})
assert resp.status_code == 200
def test_stop_returns_ok(client):
resp = client.post("/drone/stop")
assert resp.status_code == 200
def test_stream_returns_event_stream(client):
resp = client.get("/drone/stream")
assert resp.content_type.startswith("text/event-stream")
+5
View File
@@ -0,0 +1,5 @@
"""Drone intelligence utilities — multi-vector UAV detection."""
from .models import DroneContact, RemoteIDObservation, RFObservation, RFSignal
__all__ = ["DroneContact", "RemoteIDObservation", "RFObservation", "RFSignal"]
+87
View File
@@ -0,0 +1,87 @@
# utils/drone/correlator.py
from __future__ import annotations
import contextlib
import hashlib
import queue
from datetime import datetime, timezone
from utils.cleanup import DataStore, cleanup_manager
from .models import DroneContact, RemoteIDObservation, RFObservation, RFSignal
_CONTACT_TTL = 120.0
_MAX_POSITION_HISTORY = 500
def _contact_id_from_serial(serial: str) -> str:
return hashlib.sha1(f"serial:{serial}".encode()).hexdigest()[:12]
def _contact_id_from_rf(freq_hz: int, protocol: str) -> str:
return hashlib.sha1(f"rf:{freq_hz}:{protocol}".encode()).hexdigest()[:12]
def _compute_risk(contact: DroneContact) -> str:
if not contact.compliant:
return "high"
if len(contact.detection_vectors) > 1:
return "medium"
if len(contact.rf_signals) >= 2:
recent = sorted(contact.rf_signals, key=lambda s: s.timestamp)[-5:]
if abs(recent[-1].rssi - recent[0].rssi) > 15:
return "medium"
return "low"
class DroneCorrelator:
def __init__(self, output_queue: queue.Queue) -> None:
self._store: DataStore = DataStore(max_age_seconds=_CONTACT_TTL, name="drone_contacts")
self._output_queue = output_queue
cleanup_manager.register(self._store)
def process(self, obs: RemoteIDObservation | RFObservation) -> None:
now = datetime.now(timezone.utc)
if isinstance(obs, RemoteIDObservation):
contact_id = _contact_id_from_serial(obs.serial_number)
contact: DroneContact = self._store.get(contact_id) or DroneContact(
id=contact_id, first_seen=now, last_seen=now
)
contact.last_seen = now
contact.serial_number = obs.serial_number
contact.operator_id = obs.operator_id
contact.position = (obs.lat, obs.lon)
contact.altitude_m = obs.altitude_m
contact.speed_ms = obs.speed_ms
contact.heading = obs.heading
contact.compliant = True
contact.detection_vectors.add(f"REMOTE_ID_{obs.source}")
contact.position_history.append((obs.lat, obs.lon, now))
if len(contact.position_history) > _MAX_POSITION_HISTORY:
contact.position_history = contact.position_history[-_MAX_POSITION_HISTORY:]
else:
contact_id = _contact_id_from_rf(obs.frequency_hz, obs.protocol)
contact = self._store.get(contact_id) or DroneContact(id=contact_id, first_seen=now, last_seen=now)
contact.last_seen = now
contact.compliant = False
contact.detection_vectors.add(obs.hardware)
contact.rf_signals.append(
RFSignal(
frequency_hz=obs.frequency_hz,
protocol=obs.protocol,
rssi=obs.rssi,
hardware=obs.hardware,
timestamp=now,
)
)
contact.confidence = min(len(contact.detection_vectors) / 4.0, 1.0)
contact.risk_level = _compute_risk(contact)
self._store.set(contact_id, contact)
with contextlib.suppress(queue.Full):
self._output_queue.put_nowait({"type": "contact", "data": contact.to_dict()})
def get_all(self) -> list[dict]:
return [c.to_dict() for c in self._store.values()]
+87
View File
@@ -0,0 +1,87 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
_MAX_HISTORY_IN_DICT = 50
_MAX_RF_IN_DICT = 10
@dataclass
class RFSignal:
frequency_hz: int
protocol: str
rssi: float
hardware: str # "RTL433" | "HACKRF"
timestamp: datetime
@dataclass
class RemoteIDObservation:
source: str # "WIFI" | "BLE"
serial_number: str
operator_id: str
lat: float
lon: float
altitude_m: float
speed_ms: float
heading: float
timestamp: datetime
@dataclass
class RFObservation:
frequency_hz: int
protocol: str
rssi: float
hardware: str # "RTL433" | "HACKRF"
timestamp: datetime
@dataclass
class DroneContact:
id: str
first_seen: datetime
last_seen: datetime
serial_number: str | None = None
operator_id: str | None = None
position: tuple[float, float] | None = None
altitude_m: float | None = None
speed_ms: float | None = None
heading: float | None = None
position_history: list[tuple[float, float, datetime]] = field(default_factory=list)
rf_signals: list[RFSignal] = field(default_factory=list)
compliant: bool = False
detection_vectors: set[str] = field(default_factory=set)
confidence: float = 0.0
risk_level: str = "low"
def to_dict(self) -> dict:
return {
"id": self.id,
"first_seen": self.first_seen.isoformat(),
"last_seen": self.last_seen.isoformat(),
"serial_number": self.serial_number,
"operator_id": self.operator_id,
"position": list(self.position) if self.position else None,
"altitude_m": self.altitude_m,
"speed_ms": self.speed_ms,
"heading": self.heading,
"position_history": [
{"lat": p[0], "lon": p[1], "ts": p[2].isoformat()}
for p in self.position_history[-_MAX_HISTORY_IN_DICT:]
],
"rf_signals": [
{
"frequency_hz": s.frequency_hz,
"protocol": s.protocol,
"rssi": s.rssi,
"hardware": s.hardware,
}
for s in self.rf_signals[-_MAX_RF_IN_DICT:]
],
"compliant": self.compliant,
"detection_vectors": sorted(self.detection_vectors),
"confidence": round(self.confidence, 2),
"risk_level": self.risk_level,
}
+125
View File
@@ -0,0 +1,125 @@
# utils/drone/remote_id.py
"""Remote ID scanner — WiFi beacon + BLE advertisement parsing (ASTM F3411)."""
from __future__ import annotations
import contextlib
import logging
import queue
import struct
from datetime import datetime, timezone
from .models import RemoteIDObservation
logger = logging.getLogger("intercept.drone.remote_id")
_REMOTE_ID_UUID_LE = b"\xfa\xff"
_LOCATION_MSG_TYPE = 0x01
_MIN_LOCATION_PAYLOAD = 15
try:
from scapy.all import AsyncSniffer, Dot11Beacon, Dot11Elt
SCAPY_AVAILABLE = True
except ImportError:
SCAPY_AVAILABLE = False
AsyncSniffer = None
Dot11Beacon = Dot11Elt = None
def _parse_ble_remote_id(adv_data: bytes) -> RemoteIDObservation | None:
"""Parse a BLE advertisement containing an ASTM F3411 Remote ID payload."""
idx = adv_data.find(_REMOTE_ID_UUID_LE)
if idx < 0:
return None
payload = adv_data[idx + 2 :]
return _parse_wifi_remote_id(payload, source="BLE")
def _parse_wifi_remote_id(payload: bytes, source: str = "WIFI") -> RemoteIDObservation | None:
"""Parse raw ASTM F3411 Location payload bytes into a RemoteIDObservation."""
if not payload or len(payload) < 2:
return None
msg_type = payload[0] & 0x0F
if msg_type != _LOCATION_MSG_TYPE:
return None
if len(payload) < _MIN_LOCATION_PAYLOAD:
return None
try:
lat_enc, lon_enc = struct.unpack_from("<ii", payload, 2)
alt_enc = struct.unpack_from("<H", payload, 10)[0]
speed_enc = struct.unpack_from("<B", payload, 12)[0]
heading_enc = struct.unpack_from("<H", payload, 13)[0]
except struct.error:
return None
lat = lat_enc * 1e-7
lon = lon_enc * 1e-7
alt = alt_enc * 0.5 - 1000.0
speed = speed_enc * 0.25
heading = heading_enc * 0.01
if not (-90.0 <= lat <= 90.0) or not (-180.0 <= lon <= 180.0):
return None
return RemoteIDObservation(
source=source,
serial_number="",
operator_id="",
lat=lat,
lon=lon,
altitude_m=alt,
speed_ms=speed,
heading=heading,
timestamp=datetime.now(timezone.utc),
)
class RemoteIDScanner:
def __init__(self, output_queue: queue.Queue) -> None:
self._queue = output_queue
self._sniffer = None
self._running = False
@property
def running(self) -> bool:
return self._running
def _on_wifi_packet(self, pkt) -> None:
if not (Dot11Beacon and pkt.haslayer(Dot11Beacon)):
return
elt = pkt.getlayer(Dot11Elt)
while elt:
if elt.ID == 221 and elt.info:
obs = _parse_wifi_remote_id(elt.info)
if obs:
with contextlib.suppress(queue.Full):
self._queue.put_nowait(obs)
elt = elt.payload if hasattr(elt, "payload") and isinstance(elt.payload, Dot11Elt) else None
def start(self, wifi_iface: str | None = None) -> None:
if self._running:
return
self._running = True
if SCAPY_AVAILABLE and wifi_iface:
try:
sniffer = AsyncSniffer(
iface=wifi_iface,
filter="type mgt subtype beacon",
prn=self._on_wifi_packet,
store=False,
)
sniffer.start()
self._sniffer = sniffer
logger.info("WiFi Remote ID sniffer started on %s", wifi_iface)
except Exception as exc:
logger.warning("WiFi Remote ID sniffer failed to start: %s", exc)
else:
logger.info("WiFi Remote ID unavailable (scapy=%s, iface=%s)", SCAPY_AVAILABLE, wifi_iface)
def stop(self) -> None:
self._running = False
if self._sniffer:
with contextlib.suppress(Exception):
self._sniffer.stop()
self._sniffer = None
+161
View File
@@ -0,0 +1,161 @@
"""RF control-link detector — rtl_433 (433/868MHz) + hackrf_sweep (2.4/5.8GHz)."""
from __future__ import annotations
import contextlib
import json
import logging
import queue
import shutil
import subprocess
import threading
from datetime import datetime, timezone
from utils.process import register_process, safe_terminate
from .models import RFObservation
from .signatures import match_signature
logger = logging.getLogger("intercept.drone.rf_detector")
_HACKRF_THRESHOLD_DBM = -90.0
_DRONE_FREQ_RANGES_HZ = [
(433_000_000, 435_000_000),
(868_000_000, 869_000_000),
(2_400_000_000, 2_484_000_000),
(5_725_000_000, 5_875_000_000),
]
def _in_drone_band(freq_hz: int) -> bool:
return any(lo <= freq_hz <= hi for lo, hi in _DRONE_FREQ_RANGES_HZ)
class RFDetector:
def __init__(self, output_queue: queue.Queue) -> None:
self._queue = output_queue
self._stop_event = threading.Event()
self._stop_event.set() # starts in stopped state
self._proc_lock = threading.Lock()
self._rtl_proc: subprocess.Popen | None = None
self._hackrf_proc: subprocess.Popen | None = None
self._threads: list[threading.Thread] = []
@property
def running(self) -> bool:
return not self._stop_event.is_set()
def _handle_rtl433_line(self, line: str) -> None:
try:
data = json.loads(line)
except (json.JSONDecodeError, ValueError):
return
freq = data.get("freq")
rssi = data.get("rssi")
if freq is None or rssi is None:
return
freq_hz = int(float(freq))
if not _in_drone_band(freq_hz):
return
protocol = match_signature(freq_hz)
with contextlib.suppress(queue.Full):
self._queue.put_nowait(
RFObservation(
frequency_hz=freq_hz,
protocol=protocol,
rssi=float(rssi),
hardware="RTL433",
timestamp=datetime.now(timezone.utc),
)
)
def _handle_hackrf_line(self, line: str) -> None:
parts = [p.strip() for p in line.split(",")]
if len(parts) < 7:
return
try:
hz_low = int(parts[2])
hz_high = int(parts[3])
db_values = [float(p) for p in parts[6:] if p]
except (ValueError, IndexError):
return
if not db_values:
return
avg_db = sum(db_values) / len(db_values)
if avg_db < _HACKRF_THRESHOLD_DBM:
return
freq_hz = (hz_low + hz_high) // 2
if not _in_drone_band(freq_hz):
return
protocol = match_signature(freq_hz)
with contextlib.suppress(queue.Full):
self._queue.put_nowait(
RFObservation(
frequency_hz=freq_hz,
protocol=protocol,
rssi=avg_db,
hardware="HACKRF",
timestamp=datetime.now(timezone.utc),
)
)
def _run_rtl433(self, device_index: int) -> None:
rtl_bin = shutil.which("rtl_433")
if not rtl_bin:
logger.warning("rtl_433 not found — RTL-SDR RF detection disabled")
return
cmd = [rtl_bin, "-d", str(device_index), "-F", "json", "-f", "433920000", "-f", "868300000"]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
register_process(proc)
with self._proc_lock:
self._rtl_proc = proc
for raw_line in iter(proc.stdout.readline, b""):
if self._stop_event.is_set():
break
self._handle_rtl433_line(raw_line.decode("utf-8", errors="replace").strip())
safe_terminate(proc)
except Exception as exc:
logger.warning("rtl_433 error: %s", exc)
def _run_hackrf(self) -> None:
hackrf_bin = shutil.which("hackrf_sweep")
if not hackrf_bin:
logger.warning("hackrf_sweep not found — HackRF RF detection disabled")
return
cmd = [hackrf_bin, "-f", "2400:2484", "-f", "5725:5875", "-w", "1000000"]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
register_process(proc)
with self._proc_lock:
self._hackrf_proc = proc
for raw_line in iter(proc.stdout.readline, b""):
if self._stop_event.is_set():
break
self._handle_hackrf_line(raw_line.decode("utf-8", errors="replace").strip())
safe_terminate(proc)
except Exception as exc:
logger.warning("hackrf_sweep error: %s", exc)
def start(self, rtl_sdr_index: int = 0, use_hackrf: bool = True) -> None:
if self.running:
return
self._stop_event.clear()
t1 = threading.Thread(target=self._run_rtl433, args=(rtl_sdr_index,), daemon=True)
t1.start()
self._threads.append(t1)
if use_hackrf:
t2 = threading.Thread(target=self._run_hackrf, daemon=True)
t2.start()
self._threads.append(t2)
def stop(self) -> None:
self._stop_event.set()
with self._proc_lock:
rtl_proc = self._rtl_proc
hackrf_proc = self._hackrf_proc
self._rtl_proc = None
self._hackrf_proc = None
safe_terminate(rtl_proc)
safe_terminate(hackrf_proc)
self._threads.clear()
+34
View File
@@ -0,0 +1,34 @@
"""Drone RF protocol signature table and frequency matcher."""
from __future__ import annotations
_SIGNATURES = [
{
"name": "FRSKY",
"freq_min_hz": 433_050_000,
"freq_max_hz": 434_790_000,
},
{
"name": "FRSKY_868",
"freq_min_hz": 868_000_000,
"freq_max_hz": 868_600_000,
},
{
"name": "DJI_OCUSYNC",
"freq_min_hz": 2_400_000_000,
"freq_max_hz": 2_483_500_000,
},
{
"name": "FPV_VIDEO",
"freq_min_hz": 5_725_000_000,
"freq_max_hz": 5_875_000_000,
},
]
def match_signature(frequency_hz: int) -> str:
"""Return the protocol name for a detected frequency, or 'UNKNOWN'."""
for sig in _SIGNATURES:
if sig["freq_min_hz"] <= frequency_hz <= sig["freq_max_hz"]:
return sig["name"]
return "UNKNOWN"