diff --git a/app.py b/app.py index d24a4ce..a5374e2 100644 --- a/app.py +++ b/app.py @@ -317,6 +317,9 @@ deauth_detector = None deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) deauth_detector_lock = threading.Lock() +# Drone Intelligence +drone_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) + # ============================================ # GLOBAL STATE DICTIONARIES # ============================================ diff --git a/requirements.txt b/requirements.txt index 2567927..41232e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,6 +30,7 @@ meshtastic>=2.0.0 # Deauthentication attack detection (optional - for WiFi TSCM) scapy>=2.4.5 +opendroneid>=1.0 # QR code generation for Meshtastic channels (optional) qrcode[pil]>=7.4 diff --git a/routes/__init__.py b/routes/__init__.py index ddb23d4..51a3eb0 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -18,6 +18,7 @@ def register_blueprints(app): from .bt_locate import bt_locate_bp from .controller import controller_bp from .correlation import correlation_bp + from .drone import drone_bp from .dsc import dsc_bp from .gps import gps_bp from .ground_station import ground_station_bp @@ -91,6 +92,7 @@ def register_blueprints(app): app.register_blueprint(system_bp) # System health monitoring app.register_blueprint(ook_bp) # Generic OOK signal decoder app.register_blueprint(ground_station_bp) # Ground station automation + app.register_blueprint(drone_bp) # Drone intelligence / UAV detection # Exempt all API blueprints from CSRF (they use JSON, not form tokens) if _csrf: @@ -99,5 +101,6 @@ def register_blueprints(app): # Initialize TSCM state with queue and lock from app import app as app_module - if hasattr(app_module, 'tscm_queue') and hasattr(app_module, 'tscm_lock'): + + if hasattr(app_module, "tscm_queue") and hasattr(app_module, "tscm_lock"): init_tscm_state(app_module.tscm_queue, app_module.tscm_lock) diff --git a/routes/drone.py b/routes/drone.py new file mode 100644 index 0000000..8303738 --- /dev/null +++ b/routes/drone.py @@ -0,0 +1,132 @@ +"""Drone intelligence routes — multi-vector UAV detection.""" + +from __future__ import annotations + +import logging +import queue +import threading + +from flask import Blueprint, Response, jsonify, request + +import app as app_module +from utils.constants import SSE_KEEPALIVE_INTERVAL, SSE_QUEUE_TIMEOUT +from utils.drone.correlator import DroneCorrelator +from utils.drone.remote_id import RemoteIDScanner +from utils.drone.rf_detector import RFDetector +from utils.sse import sse_stream_fanout +from utils.validation import validate_device_index + +logger = logging.getLogger("intercept.drone") + +drone_bp = Blueprint("drone", __name__, url_prefix="/drone") + +_correlator: DroneCorrelator | None = None +_remote_id_scanner: RemoteIDScanner | None = None +_rf_detector: RFDetector | None = None +_obs_queue: queue.Queue | None = None # raw observations from scanners/detectors +_relay_thread: threading.Thread | None = None +_drone_running = False +_drone_lock = threading.Lock() + +_SENTINEL = object() + + +def _relay_observations() -> None: + """Read raw observations from _obs_queue and feed them into the correlator.""" + while True: + obs = _obs_queue.get() + if obs is _SENTINEL: + break + if _correlator is not None: + _correlator.process(obs) + + +def _ensure_workers() -> None: + global _correlator, _remote_id_scanner, _rf_detector, _obs_queue, _relay_thread + if _obs_queue is None: + _obs_queue = queue.Queue(maxsize=512) + if _correlator is None: + _correlator = DroneCorrelator(output_queue=app_module.drone_queue) + if _remote_id_scanner is None: + _remote_id_scanner = RemoteIDScanner(output_queue=_obs_queue) + if _rf_detector is None: + _rf_detector = RFDetector(output_queue=_obs_queue) + if _relay_thread is None or not _relay_thread.is_alive(): + _relay_thread = threading.Thread(target=_relay_observations, daemon=True) + _relay_thread.start() + + +@drone_bp.route("/status") +def status(): + vectors = [] + if _remote_id_scanner and _remote_id_scanner.running: + vectors.append("REMOTE_ID") + if _rf_detector and _rf_detector.running: + vectors.append("RF") + return jsonify( + { + "running": _drone_running, + "vectors": vectors, + "contact_count": len(_correlator.get_all()) if _correlator else 0, + } + ) + + +@drone_bp.route("/contacts") +def contacts(): + if not _correlator: + return jsonify([]) + return jsonify(_correlator.get_all()) + + +@drone_bp.route("/start", methods=["POST"]) +def start(): + global _drone_running + body = request.json or {} + wifi_iface = body.get("wifi_iface") or None + try: + rtl_index = validate_device_index(body.get("rtl_sdr_index", 0)) + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + use_hackrf = bool(body.get("use_hackrf", True)) + + with _drone_lock: + _ensure_workers() + if not _drone_running: + if _remote_id_scanner: + _remote_id_scanner.start(wifi_iface=wifi_iface) + if _rf_detector: + _rf_detector.start(rtl_sdr_index=rtl_index, use_hackrf=use_hackrf) + _drone_running = True + logger.info("Drone detection started") + + return jsonify({"status": "ok", "running": True}) + + +@drone_bp.route("/stop", methods=["POST"]) +def stop(): + global _drone_running + with _drone_lock: + if _remote_id_scanner: + _remote_id_scanner.stop() + if _rf_detector: + _rf_detector.stop() + if _obs_queue is not None: + _obs_queue.put_nowait(_SENTINEL) + _drone_running = False + logger.info("Drone detection stopped") + return jsonify({"status": "ok", "running": False}) + + +@drone_bp.route("/stream") +def stream(): + return Response( + sse_stream_fanout( + source_queue=app_module.drone_queue, + channel_key="drone", + timeout=SSE_QUEUE_TIMEOUT, + keepalive_interval=SSE_KEEPALIVE_INTERVAL, + ), + mimetype="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) diff --git a/static/css/modes/drone.css b/static/css/modes/drone.css new file mode 100644 index 0000000..1e57e71 --- /dev/null +++ b/static/css/modes/drone.css @@ -0,0 +1,86 @@ +/* Drone Intelligence Styles */ + +.drone-vector-pills { + display: flex; + flex-wrap: wrap; + gap: 6px; + margin-top: 4px; +} + +.drone-vector-pill { + font-size: 10px; + font-family: var(--font-mono); + padding: 3px 8px; + border-radius: 3px; + background: var(--bg-primary); + color: var(--text-dim); + border: 1px solid var(--border-color); + transition: background 0.2s, color 0.2s; +} + +.drone-vector-pill.active { + background: color-mix(in srgb, var(--accent-cyan) 15%, transparent); + color: var(--accent-cyan); + border-color: var(--accent-cyan); +} + +.drone-contact-card { + background: var(--bg-card); + border: 1px solid var(--border-color); + border-radius: 4px; + padding: 10px 12px; + margin-bottom: 8px; + cursor: pointer; + transition: border-color 0.15s; +} + +.drone-contact-card:hover { + border-color: var(--accent-cyan); +} + +.drone-contact-card.high-risk { + border-left: 3px solid var(--accent-red); +} + +.drone-contact-card.medium-risk { + border-left: 3px solid var(--accent-yellow); +} + +.drone-contact-card.low-risk { + border-left: 3px solid var(--accent-green); +} + +.drone-compliance-badge { + font-size: 9px; + font-family: var(--font-mono); + padding: 2px 6px; + border-radius: 2px; + font-weight: 600; + text-transform: uppercase; +} + +.drone-compliance-badge.compliant { + background: color-mix(in srgb, var(--accent-green) 20%, transparent); + color: var(--accent-green); +} + +.drone-compliance-badge.non-compliant { + background: color-mix(in srgb, var(--accent-red) 20%, transparent); + color: var(--accent-red); +} + +.drone-map { + height: 280px; + border-radius: 4px; + border: 1px solid var(--border-color); + margin: 0 12px 12px; +} + +.drone-marker-high-risk { + animation: dsc-distress-pulse 1.5s infinite; +} + +@keyframes dsc-distress-pulse { + 0%, 100% { opacity: 1; transform: scale(1); } + 50% { opacity: 0.4; transform: scale(1.4); } +} diff --git a/static/js/modes/drone.js b/static/js/modes/drone.js new file mode 100644 index 0000000..d7b7777 --- /dev/null +++ b/static/js/modes/drone.js @@ -0,0 +1,203 @@ +(function DroneMode() { + 'use strict'; + + let _sse = null; + let _map = null; + let _markers = {}; + let _trails = {}; + let _running = false; + + function init() { + document.getElementById('droneStartBtn')?.addEventListener('click', _start); + document.getElementById('droneStopBtn')?.addEventListener('click', _stop); + _initMap(); + _connectSSE(); + _refreshStatus(); + } + + function _initMap() { + if (_map) return; + const mapEl = document.getElementById('droneMap'); + if (!mapEl || typeof L === 'undefined') return; + _map = L.map('droneMap', { zoomControl: true }).setView([20, 0], 2); + L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', { + attribution: '© OpenStreetMap', + maxZoom: 18, + }).addTo(_map); + } + + function destroy() { + _disconnectSSE(); + if (_map) { + _map.remove(); + _map = null; + } + _markers = {}; + _trails = {}; + } + + function _connectSSE() { + if (_sse) return; + _sse = new EventSource('/drone/stream'); + _sse.addEventListener('message', function (e) { + try { + const msg = JSON.parse(e.data); + if (msg.type === 'contact') _handleContact(msg.data); + } catch (_) {} + }); + _sse.onerror = function () { + _sse.close(); + _sse = null; + setTimeout(_connectSSE, 3000); + }; + } + + function _disconnectSSE() { + if (_sse) { _sse.close(); _sse = null; } + } + + function _handleContact(contact) { + _upsertCard(contact); + if (contact.position) _upsertMapMarker(contact); + _updateStats(); + } + + function _upsertCard(contact) { + const listEl = document.getElementById('droneContactList'); + if (!listEl) return; + let card = document.getElementById('drone-card-' + contact.id); + if (!card) { + card = document.createElement('div'); + card.id = 'drone-card-' + contact.id; + card.className = 'drone-contact-card'; + card.addEventListener('click', function () { _focusContact(contact.id); }); + listEl.prepend(card); + } + card.className = 'drone-contact-card ' + contact.risk_level + '-risk'; + const complianceLabel = contact.compliant + ? 'Remote ID' + : 'No Remote ID'; + const vectors = (contact.detection_vectors || []).map(function (v) { + return '' + v + ''; + }).join(''); + const alt = contact.altitude_m != null ? contact.altitude_m.toFixed(0) + 'm' : '—'; + const spd = contact.speed_ms != null ? contact.speed_ms.toFixed(1) + 'm/s' : '—'; + card.innerHTML = [ + '
', + ' ' + (contact.serial_number || contact.id) + '', + ' ' + complianceLabel, + '
', + '
' + vectors + '
', + '
Alt: ' + alt + '   Speed: ' + spd + '
', + ].join(''); + } + + function _upsertMapMarker(contact) { + if (!_map) return; + const lat = contact.position[0]; + const lon = contact.position[1]; + if (_markers[contact.id]) { + _markers[contact.id].setLatLng([lat, lon]); + } else { + const color = contact.risk_level === 'high' ? 'var(--accent-red)' : + contact.risk_level === 'medium' ? 'var(--accent-yellow)' : + 'var(--accent-cyan)'; + const icon = L.divIcon({ + className: 'drone-map-icon' + (contact.risk_level === 'high' ? ' drone-marker-high-risk' : ''), + html: '
', + iconSize: [10, 10], + iconAnchor: [5, 5], + }); + _markers[contact.id] = L.marker([lat, lon], { icon: icon }) + .addTo(_map) + .bindPopup('' + (contact.serial_number || contact.id) + '
Risk: ' + contact.risk_level); + } + const trailPoints = (contact.position_history || []).map(function (p) { + return [p.lat, p.lon]; + }); + if (_trails[contact.id]) { + _trails[contact.id].setLatLngs(trailPoints); + } else if (trailPoints.length > 1) { + _trails[contact.id] = L.polyline(trailPoints, { + color: contact.risk_level === 'high' ? '#ff4444' : '#00ccff', + weight: 1.5, + opacity: 0.6, + }).addTo(_map); + } + } + + function _focusContact(contactId) { + if (_map && _markers[contactId]) { + _map.panTo(_markers[contactId].getLatLng()); + _markers[contactId].openPopup(); + } + } + + function _updateStats() { + fetch('/drone/contacts') + .then(function (r) { return r.json(); }) + .then(function (contacts) { + const nonCompliant = contacts.filter(function (c) { return !c.compliant; }).length; + const countEl = document.getElementById('droneContactCount'); + const ncEl = document.getElementById('droneNonCompliantCount'); + if (countEl) countEl.textContent = contacts.length; + if (ncEl) ncEl.textContent = nonCompliant; + }) + .catch(function () {}); + } + + function _refreshStatus() { + fetch('/drone/status') + .then(function (r) { return r.json(); }) + .then(function (data) { + _running = data.running; + _setRunningUI(data.running); + _updateVectorPills(data.vectors || []); + }) + .catch(function () {}); + } + + function _start() { + const iface = document.getElementById('droneWifiIface')?.value.trim() || null; + fetch('/drone/start', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ wifi_iface: iface }), + }) + .then(function (r) { return r.json(); }) + .then(function () { _setRunningUI(true); _refreshStatus(); }) + .catch(function () {}); + } + + function _stop() { + fetch('/drone/stop', { method: 'POST' }) + .then(function () { _setRunningUI(false); _refreshStatus(); }) + .catch(function () {}); + } + + function _setRunningUI(running) { + const startBtn = document.getElementById('droneStartBtn'); + const stopBtn = document.getElementById('droneStopBtn'); + const statusEl = document.getElementById('droneStatusText'); + if (startBtn) startBtn.disabled = running; + if (stopBtn) stopBtn.disabled = !running; + if (statusEl) { + statusEl.textContent = running ? 'Active' : 'Standby'; + statusEl.style.color = running ? 'var(--accent-green)' : 'var(--accent-yellow)'; + } + } + + function _updateVectorPills(activeVectors) { + const pillMap = { + 'REMOTE_ID': 'dronePillRemoteId', + 'RTL433': 'dronePill433', + 'HACKRF': 'dronePillHackrf', + }; + Object.entries(pillMap).forEach(function ([key, id]) { + const el = document.getElementById(id); + if (el) el.classList.toggle('active', activeVectors.some(function (v) { return v.includes(key); })); + }); + } + + window.DroneMode = { init: init, destroy: destroy }; +})(); diff --git a/templates/index.html b/templates/index.html index 787ec8f..a16be38 100644 --- a/templates/index.html +++ b/templates/index.html @@ -102,7 +102,8 @@ radiosonde: "{{ url_for('static', filename='css/modes/radiosonde.css') }}", meteor: "{{ url_for('static', filename='css/modes/meteor.css') }}", system: "{{ url_for('static', filename='css/modes/system.css') }}", - ook: "{{ url_for('static', filename='css/modes/ook.css') }}" + ook: "{{ url_for('static', filename='css/modes/ook.css') }}", + drone: "{{ url_for('static', filename='css/modes/drone.css') }}" }; window.INTERCEPT_MODE_STYLE_LOADED = {}; window.INTERCEPT_MODE_STYLE_PROMISES = {}; @@ -186,7 +187,8 @@ spaceweather: "{{ url_for('static', filename='js/modes/space-weather.js') }}", system: "{{ url_for('static', filename='js/modes/system.js') }}", meteor: "{{ url_for('static', filename='js/modes/meteor.js') }}", - waterfall: "{{ url_for('static', filename='js/modes/waterfall.js') }}?v={{ version }}&r=wfdeck21" + waterfall: "{{ url_for('static', filename='js/modes/waterfall.js') }}?v={{ version }}&r=wfdeck21", + drone: "{{ url_for('static', filename='js/modes/drone.js') }}" }; window.INTERCEPT_MODE_SCRIPT_LOADED = {}; window.INTERCEPT_MODE_SCRIPT_PROMISES = {}; @@ -764,6 +766,8 @@ {% include 'partials/modes/ais.html' %} + {% include 'partials/modes/drone.html' %} + {% include 'partials/modes/radiosonde.html' %} {% include 'partials/modes/spy-stations.html' %} @@ -3767,6 +3771,7 @@ wifi_locate: { label: 'WiFi Locate', indicator: 'WF LOCATE', outputTitle: 'WiFi Locate', group: 'wireless' }, meshtastic: { label: 'Meshtastic', indicator: 'MESHTASTIC', outputTitle: 'Meshtastic Mesh Monitor', group: 'wireless' }, tscm: { label: 'TSCM', indicator: 'TSCM', outputTitle: 'TSCM Counter-Surveillance', group: 'intel' }, + drone: { label: 'Drone Intel', indicator: 'DRONE', outputTitle: 'Drone Intelligence', group: 'intel' }, spystations: { label: 'Spy Stations', indicator: 'SPY STATIONS', outputTitle: 'Spy Stations', group: 'intel' }, websdr: { label: 'WebSDR', indicator: 'WEBSDR', outputTitle: 'HF/Shortwave WebSDR', group: 'intel' }, waterfall: { label: 'Waterfall', indicator: 'WATERFALL', outputTitle: 'Spectrum Waterfall', group: 'signals' }, @@ -4403,6 +4408,7 @@ tscm: () => { if (tscmEventSource) { tscmEventSource.close(); tscmEventSource = null; } }, meteor: () => typeof MeteorScatter !== 'undefined' && MeteorScatter.destroy?.(), ook: () => typeof OokMode !== 'undefined' && OokMode.destroy?.(), + drone: () => typeof DroneMode !== 'undefined' && DroneMode.destroy?.(), }; return moduleDestroyMap[mode] || null; } @@ -4713,6 +4719,7 @@ document.getElementById('aprsMode')?.classList.toggle('active', mode === 'aprs'); document.getElementById('tscmMode')?.classList.toggle('active', mode === 'tscm'); document.getElementById('aisMode')?.classList.toggle('active', mode === 'ais'); + document.getElementById('droneMode')?.classList.toggle('active', mode === 'drone'); document.getElementById('radiosondeMode')?.classList.toggle('active', mode === 'radiosonde'); document.getElementById('spystationsMode')?.classList.toggle('active', mode === 'spystations'); document.getElementById('meshtasticMode')?.classList.toggle('active', mode === 'meshtastic'); @@ -5011,6 +5018,8 @@ SystemHealth.init(); } else if (mode === 'ook') { OokMode.init(); + } else if (mode === 'drone') { + if (typeof DroneMode !== 'undefined') DroneMode.init(); } if (requestId !== modeSwitchRequestId) return; diff --git a/templates/partials/modes/drone.html b/templates/partials/modes/drone.html new file mode 100644 index 0000000..c39babf --- /dev/null +++ b/templates/partials/modes/drone.html @@ -0,0 +1,49 @@ + + diff --git a/tests/test_drone_correlator.py b/tests/test_drone_correlator.py new file mode 100644 index 0000000..23bcc23 --- /dev/null +++ b/tests/test_drone_correlator.py @@ -0,0 +1,134 @@ +# tests/test_drone_correlator.py +import queue +import time +from datetime import datetime, timezone + +import pytest + +from utils.drone.correlator import DroneCorrelator +from utils.drone.models import RemoteIDObservation, RFObservation + + +def _now(): + return datetime.now(timezone.utc) + + +def _remote_id_obs(serial="SN001", lat=51.5, lon=-0.1): + return RemoteIDObservation( + source="WIFI", + serial_number=serial, + operator_id="OP001", + lat=lat, + lon=lon, + altitude_m=50.0, + speed_ms=5.0, + heading=90.0, + timestamp=_now(), + ) + + +def _rf_obs(freq=433_920_000, proto="FRSKY", rssi=-70.0): + return RFObservation( + frequency_hz=freq, + protocol=proto, + rssi=rssi, + hardware="RTL433", + timestamp=_now(), + ) + + +@pytest.fixture +def correlator(): + q = queue.Queue() + return DroneCorrelator(output_queue=q), q + + +def test_remote_id_creates_contact(correlator): + corr, q = correlator + corr.process(_remote_id_obs()) + contacts = corr.get_all() + assert len(contacts) == 1 + assert contacts[0]["compliant"] is True + assert contacts[0]["serial_number"] == "SN001" + assert contacts[0]["position"] == [51.5, -0.1] + + +def test_rf_creates_contact(correlator): + corr, q = correlator + corr.process(_rf_obs()) + contacts = corr.get_all() + assert len(contacts) == 1 + assert contacts[0]["compliant"] is False + + +def test_remote_id_emits_sse_event(correlator): + corr, q = correlator + corr.process(_remote_id_obs()) + msg = q.get_nowait() + assert msg["type"] == "contact" + assert msg["data"]["serial_number"] == "SN001" + + +def test_same_serial_updates_contact(correlator): + corr, q = correlator + corr.process(_remote_id_obs(lat=51.5, lon=-0.1)) + corr.process(_remote_id_obs(lat=51.6, lon=-0.2)) + contacts = corr.get_all() + assert len(contacts) == 1 + assert contacts[0]["position"] == [51.6, -0.2] + + +def test_different_serials_create_separate_contacts(correlator): + corr, q = correlator + corr.process(_remote_id_obs(serial="SN001")) + corr.process(_remote_id_obs(serial="SN002")) + contacts = corr.get_all() + assert len(contacts) == 2 + + +def test_position_history_grows(correlator): + corr, q = correlator + for i in range(5): + corr.process(_remote_id_obs(lat=51.0 + i * 0.01, lon=-0.1)) + contacts = corr.get_all() + assert len(contacts[0]["position_history"]) == 5 + + +def test_position_history_capped_at_500(correlator): + corr, q = correlator + for i in range(510): + corr.process(_remote_id_obs(lat=float(i), lon=0.0)) + store_values = list(corr._store.values()) + assert len(store_values[0].position_history) == 500 + + +def test_compliant_single_vector_is_low_risk(correlator): + corr, q = correlator + corr.process(_remote_id_obs()) + contacts = corr.get_all() + assert contacts[0]["risk_level"] == "low" + + +def test_non_compliant_is_high_risk(correlator): + corr, q = correlator + corr.process(_rf_obs()) + contacts = corr.get_all() + assert contacts[0]["risk_level"] == "high" + + +def test_confidence_increases_with_vectors(correlator): + corr, q = correlator + corr.process(_remote_id_obs()) + contacts = {c["id"]: c for c in corr.get_all()} + rid_contact = next(c for c in contacts.values() if c["compliant"]) + assert rid_contact["confidence"] == 0.25 # 1/4 + + +def test_ttl_expiry_removes_contact(correlator): + corr, q = correlator + corr.process(_remote_id_obs()) + assert len(corr.get_all()) == 1 + for key in corr._store.timestamps: + corr._store.timestamps[key] = time.time() - 300 + corr._store.cleanup() + assert len(corr.get_all()) == 0 diff --git a/tests/test_drone_models.py b/tests/test_drone_models.py new file mode 100644 index 0000000..4577c7a --- /dev/null +++ b/tests/test_drone_models.py @@ -0,0 +1,67 @@ +# tests/test_drone_models.py +from datetime import datetime, timezone + +from utils.drone.models import DroneContact, RFSignal +from utils.drone.signatures import match_signature + + +def _now(): + return datetime.now(timezone.utc) + + +def test_drone_contact_to_dict_minimal(): + c = DroneContact(id="abc123", first_seen=_now(), last_seen=_now()) + d = c.to_dict() + assert d["id"] == "abc123" + assert d["compliant"] is False + assert d["risk_level"] == "low" + assert d["detection_vectors"] == [] + assert d["position"] is None + + +def test_drone_contact_to_dict_with_position(): + c = DroneContact(id="xyz", first_seen=_now(), last_seen=_now()) + c.position = (51.5, -0.1) + c.serial_number = "SN001" + c.compliant = True + c.detection_vectors = {"REMOTE_ID_WIFI"} + d = c.to_dict() + assert d["position"] == [51.5, -0.1] + assert d["serial_number"] == "SN001" + assert d["detection_vectors"] == ["REMOTE_ID_WIFI"] + + +def test_drone_contact_position_history_capped(): + c = DroneContact(id="cap", first_seen=_now(), last_seen=_now()) + for i in range(510): + c.position_history.append((float(i), float(i), _now())) + d = c.to_dict() + # to_dict sends last 50 + assert len(d["position_history"]) == 50 + + +def test_rf_signal_fields(): + s = RFSignal(frequency_hz=433_920_000, protocol="FRSKY", rssi=-65.0, hardware="RTL433", timestamp=_now()) + assert s.frequency_hz == 433_920_000 + assert s.protocol == "FRSKY" + + +def test_match_signature_frsky_433(): + assert match_signature(433_920_000) == "FRSKY" + + +def test_match_signature_ocusync_24(): + assert match_signature(2_440_000_000) == "DJI_OCUSYNC" + + +def test_match_signature_fpv_58(): + assert match_signature(5_800_000_000) == "FPV_VIDEO" + + +def test_match_signature_ocusync_at_2450mhz(): + # 2,450 MHz is within the DJI_OCUSYNC band + assert match_signature(2_450_000_000) == "DJI_OCUSYNC" + + +def test_match_signature_unrecognised(): + assert match_signature(100_000_000) == "UNKNOWN" diff --git a/tests/test_drone_remote_id.py b/tests/test_drone_remote_id.py new file mode 100644 index 0000000..05bc73b --- /dev/null +++ b/tests/test_drone_remote_id.py @@ -0,0 +1,92 @@ +# tests/test_drone_remote_id.py +import queue +import struct +from unittest.mock import MagicMock, patch + +from utils.drone.remote_id import RemoteIDScanner, _parse_ble_remote_id, _parse_wifi_remote_id + + +def _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0) -> bytes: + """Craft a minimal ASTM F3411 Location message (message type 0x01).""" + msg_type = 0x01 + status = 0x00 + lat_enc = int(lat * 1e7) + lon_enc = int(lon * 1e7) + alt_enc = int((alt + 1000) / 0.5) + speed_enc = int(speed / 0.25) + heading_enc = int(heading / 0.01) + return struct.pack(" bytes: + msg_type = 0x00 + id_type = 0x01 + serial_bytes = serial.encode("ascii").ljust(20, b"\x00")[:20] + return bytes([msg_type, id_type]) + serial_bytes + + +def _make_ble_adv_with_remote_id(payload: bytes) -> bytes: + uuid_bytes = b"\xfa\xff" + service_data_type = 0x16 + length = len(uuid_bytes) + len(payload) + 1 + return bytes([length, service_data_type]) + uuid_bytes + payload + + +def test_parse_ble_location_returns_observation(): + payload = _make_location_payload(lat=51.5, lon=-0.1, alt=50.0, speed=5.0, heading=90.0) + adv = _make_ble_adv_with_remote_id(payload) + obs = _parse_ble_remote_id(adv) + assert obs is not None + assert obs.source == "BLE" + assert abs(obs.lat - 51.5) < 0.0001 + assert abs(obs.lon - (-0.1)) < 0.0001 + assert abs(obs.altitude_m - 50.0) < 1.0 + assert abs(obs.speed_ms - 5.0) < 0.5 + + +def test_parse_ble_no_uuid_returns_none(): + obs = _parse_ble_remote_id(b"\x00\x01\x02\x03") + assert obs is None + + +def test_parse_ble_too_short_returns_none(): + adv = _make_ble_adv_with_remote_id(b"\x01\x00") + obs = _parse_ble_remote_id(adv) + assert obs is None + + +def test_parse_wifi_remote_id_returns_observation(): + payload = _make_location_payload(lat=52.0, lon=0.5) + obs = _parse_wifi_remote_id(payload) + assert obs is not None + assert obs.source == "WIFI" + assert abs(obs.lat - 52.0) < 0.0001 + + +def test_parse_wifi_non_location_returns_none(): + payload = _make_basic_id_payload() + obs = _parse_wifi_remote_id(payload) + assert obs is None + + +def test_scanner_start_stop(): + q = queue.Queue() + scanner = RemoteIDScanner(output_queue=q) + with ( + patch("utils.drone.remote_id.SCAPY_AVAILABLE", True), + patch("utils.drone.remote_id.AsyncSniffer") as mock_sniffer, + ): + mock_sniffer.return_value = MagicMock() + scanner.start(wifi_iface="wlan0mon") + assert scanner.running + scanner.stop() + assert not scanner.running + + +def test_scanner_start_without_scapy_still_works(): + q = queue.Queue() + scanner = RemoteIDScanner(output_queue=q) + with patch("utils.drone.remote_id.SCAPY_AVAILABLE", False): + scanner.start(wifi_iface=None) + assert scanner.running + scanner.stop() diff --git a/tests/test_drone_rf_detector.py b/tests/test_drone_rf_detector.py new file mode 100644 index 0000000..6ee654c --- /dev/null +++ b/tests/test_drone_rf_detector.py @@ -0,0 +1,94 @@ +"""Tests for RFDetector (rtl_433 + hackrf_sweep control-link detection).""" + +from __future__ import annotations + +import json +import queue +from unittest.mock import MagicMock, patch + +import pytest + +from utils.drone.models import RFObservation +from utils.drone.rf_detector import RFDetector + + +@pytest.fixture +def detector(): + q = queue.Queue() + return RFDetector(output_queue=q), q + + +def test_detector_not_running_initially(detector): + det, q = detector + assert not det.running + + +def test_rtl433_json_line_emits_observation(detector): + det, q = detector + rtl433_line = json.dumps( + { + "freq": 433920000, + "rssi": -68.5, + "protocol": "FrSky", + } + ) + det._handle_rtl433_line(rtl433_line) + obs = q.get_nowait() + assert isinstance(obs, RFObservation) + assert obs.frequency_hz == 433_920_000 + assert obs.hardware == "RTL433" + assert obs.rssi == -68.5 + + +def test_rtl433_non_json_line_ignored(detector): + det, q = detector + det._handle_rtl433_line("not json at all") + assert q.empty() + + +def test_hackrf_sweep_line_emits_observation(detector): + det, q = detector + # hackrf_sweep CSV: date, time, hz_low, hz_high, hz_bin_width, num_samples, db, db, ... + hz_low = 2_440_000_000 + hz_high = 2_441_000_000 + sweep_line = f"2026-05-03, 12:00:00, {hz_low}, {hz_high}, 1000000, 10, -45.2, -46.1, -44.8" + det._handle_hackrf_line(sweep_line) + obs = q.get_nowait() + assert isinstance(obs, RFObservation) + assert obs.hardware == "HACKRF" + assert obs.frequency_hz == (hz_low + hz_high) // 2 + assert obs.rssi < 0 + + +def test_hackrf_sweep_below_threshold_ignored(detector): + det, q = detector + hz_low = 2_440_000_000 + hz_high = 2_441_000_000 + # Very low power — should be ignored (below -90 dBm threshold) + sweep_line = f"2026-05-03, 12:00:00, {hz_low}, {hz_high}, 1000000, 10, -95.0, -96.0, -95.5" + det._handle_hackrf_line(sweep_line) + assert q.empty() + + +def test_out_of_band_frequency_ignored(detector): + det, q = detector + # 915 MHz is not in any drone band + line = json.dumps({"freq": 915_000_000, "rssi": -50.0, "protocol": "Generic"}) + det._handle_rtl433_line(line) + assert q.empty() + + +def test_start_stop(detector): + det, q = detector + mock_proc = MagicMock() + mock_proc.stdout = MagicMock() + mock_proc.stdout.readline = MagicMock(side_effect=[b""]) + # Patch both shutil.which calls (rtl_433 in _run_rtl433, hackrf_sweep in _run_hackrf) + with ( + patch("subprocess.Popen", return_value=mock_proc), + patch("utils.drone.rf_detector.shutil.which", return_value=None), + ): + det.start(rtl_sdr_index=0, use_hackrf=False) + assert det.running + det.stop() + assert not det.running diff --git a/tests/test_drone_routes.py b/tests/test_drone_routes.py new file mode 100644 index 0000000..a7998f9 --- /dev/null +++ b/tests/test_drone_routes.py @@ -0,0 +1,63 @@ +import json +import queue +from unittest.mock import patch + +import pytest +from flask import Flask + +import app as app_module +from routes.drone import drone_bp + + +@pytest.fixture(autouse=True) +def mock_app_state(mocker): + mocker.patch.object(app_module, "drone_queue", queue.Queue()) + yield + + +@pytest.fixture +def drone_app(): + app = Flask(__name__) + app.register_blueprint(drone_bp) + app.config["TESTING"] = True + return app + + +@pytest.fixture +def client(drone_app): + return drone_app.test_client() + + +def test_status_returns_json(client): + resp = client.get("/drone/status") + assert resp.status_code == 200 + data = json.loads(resp.data) + assert "running" in data + assert "vectors" in data + + +def test_contacts_returns_empty_list_when_idle(client): + resp = client.get("/drone/contacts") + assert resp.status_code == 200 + data = json.loads(resp.data) + assert data == [] or isinstance(data, list) + + +def test_start_returns_ok(client): + with ( + patch("routes.drone._correlator"), + patch("routes.drone._remote_id_scanner"), + patch("routes.drone._rf_detector"), + ): + resp = client.post("/drone/start", json={}) + assert resp.status_code == 200 + + +def test_stop_returns_ok(client): + resp = client.post("/drone/stop") + assert resp.status_code == 200 + + +def test_stream_returns_event_stream(client): + resp = client.get("/drone/stream") + assert resp.content_type.startswith("text/event-stream") diff --git a/utils/drone/__init__.py b/utils/drone/__init__.py new file mode 100644 index 0000000..60b3f3a --- /dev/null +++ b/utils/drone/__init__.py @@ -0,0 +1,5 @@ +"""Drone intelligence utilities — multi-vector UAV detection.""" + +from .models import DroneContact, RemoteIDObservation, RFObservation, RFSignal + +__all__ = ["DroneContact", "RemoteIDObservation", "RFObservation", "RFSignal"] diff --git a/utils/drone/correlator.py b/utils/drone/correlator.py new file mode 100644 index 0000000..cdade25 --- /dev/null +++ b/utils/drone/correlator.py @@ -0,0 +1,87 @@ +# utils/drone/correlator.py +from __future__ import annotations + +import contextlib +import hashlib +import queue +from datetime import datetime, timezone + +from utils.cleanup import DataStore, cleanup_manager + +from .models import DroneContact, RemoteIDObservation, RFObservation, RFSignal + +_CONTACT_TTL = 120.0 +_MAX_POSITION_HISTORY = 500 + + +def _contact_id_from_serial(serial: str) -> str: + return hashlib.sha1(f"serial:{serial}".encode()).hexdigest()[:12] + + +def _contact_id_from_rf(freq_hz: int, protocol: str) -> str: + return hashlib.sha1(f"rf:{freq_hz}:{protocol}".encode()).hexdigest()[:12] + + +def _compute_risk(contact: DroneContact) -> str: + if not contact.compliant: + return "high" + if len(contact.detection_vectors) > 1: + return "medium" + if len(contact.rf_signals) >= 2: + recent = sorted(contact.rf_signals, key=lambda s: s.timestamp)[-5:] + if abs(recent[-1].rssi - recent[0].rssi) > 15: + return "medium" + return "low" + + +class DroneCorrelator: + def __init__(self, output_queue: queue.Queue) -> None: + self._store: DataStore = DataStore(max_age_seconds=_CONTACT_TTL, name="drone_contacts") + self._output_queue = output_queue + cleanup_manager.register(self._store) + + def process(self, obs: RemoteIDObservation | RFObservation) -> None: + now = datetime.now(timezone.utc) + + if isinstance(obs, RemoteIDObservation): + contact_id = _contact_id_from_serial(obs.serial_number) + contact: DroneContact = self._store.get(contact_id) or DroneContact( + id=contact_id, first_seen=now, last_seen=now + ) + contact.last_seen = now + contact.serial_number = obs.serial_number + contact.operator_id = obs.operator_id + contact.position = (obs.lat, obs.lon) + contact.altitude_m = obs.altitude_m + contact.speed_ms = obs.speed_ms + contact.heading = obs.heading + contact.compliant = True + contact.detection_vectors.add(f"REMOTE_ID_{obs.source}") + contact.position_history.append((obs.lat, obs.lon, now)) + if len(contact.position_history) > _MAX_POSITION_HISTORY: + contact.position_history = contact.position_history[-_MAX_POSITION_HISTORY:] + else: + contact_id = _contact_id_from_rf(obs.frequency_hz, obs.protocol) + contact = self._store.get(contact_id) or DroneContact(id=contact_id, first_seen=now, last_seen=now) + contact.last_seen = now + contact.compliant = False + contact.detection_vectors.add(obs.hardware) + contact.rf_signals.append( + RFSignal( + frequency_hz=obs.frequency_hz, + protocol=obs.protocol, + rssi=obs.rssi, + hardware=obs.hardware, + timestamp=now, + ) + ) + + contact.confidence = min(len(contact.detection_vectors) / 4.0, 1.0) + contact.risk_level = _compute_risk(contact) + self._store.set(contact_id, contact) + + with contextlib.suppress(queue.Full): + self._output_queue.put_nowait({"type": "contact", "data": contact.to_dict()}) + + def get_all(self) -> list[dict]: + return [c.to_dict() for c in self._store.values()] diff --git a/utils/drone/models.py b/utils/drone/models.py new file mode 100644 index 0000000..2f9614b --- /dev/null +++ b/utils/drone/models.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime + +_MAX_HISTORY_IN_DICT = 50 +_MAX_RF_IN_DICT = 10 + + +@dataclass +class RFSignal: + frequency_hz: int + protocol: str + rssi: float + hardware: str # "RTL433" | "HACKRF" + timestamp: datetime + + +@dataclass +class RemoteIDObservation: + source: str # "WIFI" | "BLE" + serial_number: str + operator_id: str + lat: float + lon: float + altitude_m: float + speed_ms: float + heading: float + timestamp: datetime + + +@dataclass +class RFObservation: + frequency_hz: int + protocol: str + rssi: float + hardware: str # "RTL433" | "HACKRF" + timestamp: datetime + + +@dataclass +class DroneContact: + id: str + first_seen: datetime + last_seen: datetime + serial_number: str | None = None + operator_id: str | None = None + position: tuple[float, float] | None = None + altitude_m: float | None = None + speed_ms: float | None = None + heading: float | None = None + position_history: list[tuple[float, float, datetime]] = field(default_factory=list) + rf_signals: list[RFSignal] = field(default_factory=list) + compliant: bool = False + detection_vectors: set[str] = field(default_factory=set) + confidence: float = 0.0 + risk_level: str = "low" + + def to_dict(self) -> dict: + return { + "id": self.id, + "first_seen": self.first_seen.isoformat(), + "last_seen": self.last_seen.isoformat(), + "serial_number": self.serial_number, + "operator_id": self.operator_id, + "position": list(self.position) if self.position else None, + "altitude_m": self.altitude_m, + "speed_ms": self.speed_ms, + "heading": self.heading, + "position_history": [ + {"lat": p[0], "lon": p[1], "ts": p[2].isoformat()} + for p in self.position_history[-_MAX_HISTORY_IN_DICT:] + ], + "rf_signals": [ + { + "frequency_hz": s.frequency_hz, + "protocol": s.protocol, + "rssi": s.rssi, + "hardware": s.hardware, + } + for s in self.rf_signals[-_MAX_RF_IN_DICT:] + ], + "compliant": self.compliant, + "detection_vectors": sorted(self.detection_vectors), + "confidence": round(self.confidence, 2), + "risk_level": self.risk_level, + } diff --git a/utils/drone/remote_id.py b/utils/drone/remote_id.py new file mode 100644 index 0000000..778e0de --- /dev/null +++ b/utils/drone/remote_id.py @@ -0,0 +1,125 @@ +# utils/drone/remote_id.py +"""Remote ID scanner — WiFi beacon + BLE advertisement parsing (ASTM F3411).""" + +from __future__ import annotations + +import contextlib +import logging +import queue +import struct +from datetime import datetime, timezone + +from .models import RemoteIDObservation + +logger = logging.getLogger("intercept.drone.remote_id") + +_REMOTE_ID_UUID_LE = b"\xfa\xff" +_LOCATION_MSG_TYPE = 0x01 +_MIN_LOCATION_PAYLOAD = 15 + +try: + from scapy.all import AsyncSniffer, Dot11Beacon, Dot11Elt + + SCAPY_AVAILABLE = True +except ImportError: + SCAPY_AVAILABLE = False + AsyncSniffer = None + Dot11Beacon = Dot11Elt = None + + +def _parse_ble_remote_id(adv_data: bytes) -> RemoteIDObservation | None: + """Parse a BLE advertisement containing an ASTM F3411 Remote ID payload.""" + idx = adv_data.find(_REMOTE_ID_UUID_LE) + if idx < 0: + return None + payload = adv_data[idx + 2 :] + return _parse_wifi_remote_id(payload, source="BLE") + + +def _parse_wifi_remote_id(payload: bytes, source: str = "WIFI") -> RemoteIDObservation | None: + """Parse raw ASTM F3411 Location payload bytes into a RemoteIDObservation.""" + if not payload or len(payload) < 2: + return None + msg_type = payload[0] & 0x0F + if msg_type != _LOCATION_MSG_TYPE: + return None + if len(payload) < _MIN_LOCATION_PAYLOAD: + return None + try: + lat_enc, lon_enc = struct.unpack_from(" None: + self._queue = output_queue + self._sniffer = None + self._running = False + + @property + def running(self) -> bool: + return self._running + + def _on_wifi_packet(self, pkt) -> None: + if not (Dot11Beacon and pkt.haslayer(Dot11Beacon)): + return + elt = pkt.getlayer(Dot11Elt) + while elt: + if elt.ID == 221 and elt.info: + obs = _parse_wifi_remote_id(elt.info) + if obs: + with contextlib.suppress(queue.Full): + self._queue.put_nowait(obs) + elt = elt.payload if hasattr(elt, "payload") and isinstance(elt.payload, Dot11Elt) else None + + def start(self, wifi_iface: str | None = None) -> None: + if self._running: + return + self._running = True + if SCAPY_AVAILABLE and wifi_iface: + try: + sniffer = AsyncSniffer( + iface=wifi_iface, + filter="type mgt subtype beacon", + prn=self._on_wifi_packet, + store=False, + ) + sniffer.start() + self._sniffer = sniffer + logger.info("WiFi Remote ID sniffer started on %s", wifi_iface) + except Exception as exc: + logger.warning("WiFi Remote ID sniffer failed to start: %s", exc) + else: + logger.info("WiFi Remote ID unavailable (scapy=%s, iface=%s)", SCAPY_AVAILABLE, wifi_iface) + + def stop(self) -> None: + self._running = False + if self._sniffer: + with contextlib.suppress(Exception): + self._sniffer.stop() + self._sniffer = None diff --git a/utils/drone/rf_detector.py b/utils/drone/rf_detector.py new file mode 100644 index 0000000..eea689f --- /dev/null +++ b/utils/drone/rf_detector.py @@ -0,0 +1,161 @@ +"""RF control-link detector — rtl_433 (433/868MHz) + hackrf_sweep (2.4/5.8GHz).""" + +from __future__ import annotations + +import contextlib +import json +import logging +import queue +import shutil +import subprocess +import threading +from datetime import datetime, timezone + +from utils.process import register_process, safe_terminate + +from .models import RFObservation +from .signatures import match_signature + +logger = logging.getLogger("intercept.drone.rf_detector") + +_HACKRF_THRESHOLD_DBM = -90.0 +_DRONE_FREQ_RANGES_HZ = [ + (433_000_000, 435_000_000), + (868_000_000, 869_000_000), + (2_400_000_000, 2_484_000_000), + (5_725_000_000, 5_875_000_000), +] + + +def _in_drone_band(freq_hz: int) -> bool: + return any(lo <= freq_hz <= hi for lo, hi in _DRONE_FREQ_RANGES_HZ) + + +class RFDetector: + def __init__(self, output_queue: queue.Queue) -> None: + self._queue = output_queue + self._stop_event = threading.Event() + self._stop_event.set() # starts in stopped state + self._proc_lock = threading.Lock() + self._rtl_proc: subprocess.Popen | None = None + self._hackrf_proc: subprocess.Popen | None = None + self._threads: list[threading.Thread] = [] + + @property + def running(self) -> bool: + return not self._stop_event.is_set() + + def _handle_rtl433_line(self, line: str) -> None: + try: + data = json.loads(line) + except (json.JSONDecodeError, ValueError): + return + freq = data.get("freq") + rssi = data.get("rssi") + if freq is None or rssi is None: + return + freq_hz = int(float(freq)) + if not _in_drone_band(freq_hz): + return + protocol = match_signature(freq_hz) + with contextlib.suppress(queue.Full): + self._queue.put_nowait( + RFObservation( + frequency_hz=freq_hz, + protocol=protocol, + rssi=float(rssi), + hardware="RTL433", + timestamp=datetime.now(timezone.utc), + ) + ) + + def _handle_hackrf_line(self, line: str) -> None: + parts = [p.strip() for p in line.split(",")] + if len(parts) < 7: + return + try: + hz_low = int(parts[2]) + hz_high = int(parts[3]) + db_values = [float(p) for p in parts[6:] if p] + except (ValueError, IndexError): + return + if not db_values: + return + avg_db = sum(db_values) / len(db_values) + if avg_db < _HACKRF_THRESHOLD_DBM: + return + freq_hz = (hz_low + hz_high) // 2 + if not _in_drone_band(freq_hz): + return + protocol = match_signature(freq_hz) + with contextlib.suppress(queue.Full): + self._queue.put_nowait( + RFObservation( + frequency_hz=freq_hz, + protocol=protocol, + rssi=avg_db, + hardware="HACKRF", + timestamp=datetime.now(timezone.utc), + ) + ) + + def _run_rtl433(self, device_index: int) -> None: + rtl_bin = shutil.which("rtl_433") + if not rtl_bin: + logger.warning("rtl_433 not found — RTL-SDR RF detection disabled") + return + cmd = [rtl_bin, "-d", str(device_index), "-F", "json", "-f", "433920000", "-f", "868300000"] + try: + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + register_process(proc) + with self._proc_lock: + self._rtl_proc = proc + for raw_line in iter(proc.stdout.readline, b""): + if self._stop_event.is_set(): + break + self._handle_rtl433_line(raw_line.decode("utf-8", errors="replace").strip()) + safe_terminate(proc) + except Exception as exc: + logger.warning("rtl_433 error: %s", exc) + + def _run_hackrf(self) -> None: + hackrf_bin = shutil.which("hackrf_sweep") + if not hackrf_bin: + logger.warning("hackrf_sweep not found — HackRF RF detection disabled") + return + cmd = [hackrf_bin, "-f", "2400:2484", "-f", "5725:5875", "-w", "1000000"] + try: + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + register_process(proc) + with self._proc_lock: + self._hackrf_proc = proc + for raw_line in iter(proc.stdout.readline, b""): + if self._stop_event.is_set(): + break + self._handle_hackrf_line(raw_line.decode("utf-8", errors="replace").strip()) + safe_terminate(proc) + except Exception as exc: + logger.warning("hackrf_sweep error: %s", exc) + + def start(self, rtl_sdr_index: int = 0, use_hackrf: bool = True) -> None: + if self.running: + return + self._stop_event.clear() + t1 = threading.Thread(target=self._run_rtl433, args=(rtl_sdr_index,), daemon=True) + t1.start() + self._threads.append(t1) + if use_hackrf: + t2 = threading.Thread(target=self._run_hackrf, daemon=True) + t2.start() + self._threads.append(t2) + + def stop(self) -> None: + self._stop_event.set() + with self._proc_lock: + rtl_proc = self._rtl_proc + hackrf_proc = self._hackrf_proc + self._rtl_proc = None + self._hackrf_proc = None + safe_terminate(rtl_proc) + safe_terminate(hackrf_proc) + self._threads.clear() diff --git a/utils/drone/signatures.py b/utils/drone/signatures.py new file mode 100644 index 0000000..f9f2743 --- /dev/null +++ b/utils/drone/signatures.py @@ -0,0 +1,34 @@ +"""Drone RF protocol signature table and frequency matcher.""" + +from __future__ import annotations + +_SIGNATURES = [ + { + "name": "FRSKY", + "freq_min_hz": 433_050_000, + "freq_max_hz": 434_790_000, + }, + { + "name": "FRSKY_868", + "freq_min_hz": 868_000_000, + "freq_max_hz": 868_600_000, + }, + { + "name": "DJI_OCUSYNC", + "freq_min_hz": 2_400_000_000, + "freq_max_hz": 2_483_500_000, + }, + { + "name": "FPV_VIDEO", + "freq_min_hz": 5_725_000_000, + "freq_max_hz": 5_875_000_000, + }, +] + + +def match_signature(frequency_hz: int) -> str: + """Return the protocol name for a detected frequency, or 'UNKNOWN'.""" + for sig in _SIGNATURES: + if sig["freq_min_hz"] <= frequency_hz <= sig["freq_max_hz"]: + return sig["name"] + return "UNKNOWN"