Files
intercept/tests/test_drone_rf_detector.py
T
James Smith e8b94b6efc feat(drone): add RFDetector for rtl_433 and hackrf_sweep control-link detection
Implements RFDetector class that wraps rtl_433 (433/868MHz) and hackrf_sweep
(2.4/5.8GHz) subprocesses, emitting RFObservation objects onto a shared queue.
Includes signature matching, frequency band validation, and power thresholding.

- _handle_rtl433_line(): Parse JSON output, filter drone bands, emit observations
- _handle_hackrf_line(): Parse CSV output, average power levels, threshold at -90dBm
- start()/stop(): Manage subprocess threads for concurrent RF detection
- Graceful handling of missing tools (rtl_433, hackrf_sweep)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-03 12:52:03 +01:00

84 lines
2.3 KiB
Python

"""Tests for RFDetector (rtl_433 + hackrf_sweep control-link detection)."""
from __future__ import annotations
import json
import queue
from unittest.mock import MagicMock, patch
import pytest
from utils.drone.models import RFObservation
from utils.drone.rf_detector import RFDetector
@pytest.fixture
def detector():
q = queue.Queue()
return RFDetector(output_queue=q), q
def test_detector_not_running_initially(detector):
det, q = detector
assert not det.running
def test_rtl433_json_line_emits_observation(detector):
det, q = detector
rtl433_line = json.dumps(
{
"freq": 433920000,
"rssi": -68.5,
"protocol": "FrSky",
}
)
det._handle_rtl433_line(rtl433_line)
obs = q.get_nowait()
assert isinstance(obs, RFObservation)
assert obs.frequency_hz == 433_920_000
assert obs.hardware == "RTL433"
assert obs.rssi == -68.5
def test_rtl433_non_json_line_ignored(detector):
det, q = detector
det._handle_rtl433_line("not json at all")
assert q.empty()
def test_hackrf_sweep_line_emits_observation(detector):
det, q = detector
# hackrf_sweep CSV: date, time, hz_low, hz_high, hz_bin_width, num_samples, db, db, ...
hz_low = 2_440_000_000
hz_high = 2_441_000_000
sweep_line = f"2026-05-03, 12:00:00, {hz_low}, {hz_high}, 1000000, 10, -45.2, -46.1, -44.8"
det._handle_hackrf_line(sweep_line)
obs = q.get_nowait()
assert isinstance(obs, RFObservation)
assert obs.hardware == "HACKRF"
assert obs.frequency_hz == hz_low
assert obs.rssi < 0
def test_hackrf_sweep_below_threshold_ignored(detector):
det, q = detector
hz_low = 2_440_000_000
hz_high = 2_441_000_000
# Very low power — should be ignored (below -90 dBm threshold)
sweep_line = f"2026-05-03, 12:00:00, {hz_low}, {hz_high}, 1000000, 10, -95.0, -96.0, -95.5"
det._handle_hackrf_line(sweep_line)
assert q.empty()
def test_start_stop(detector):
det, q = detector
mock_proc = MagicMock()
mock_proc.stdout = MagicMock()
mock_proc.stdout.readline = MagicMock(side_effect=[b""])
with patch("subprocess.Popen", return_value=mock_proc):
with patch("shutil.which", return_value="/usr/bin/rtl_433"):
det.start(rtl_sdr_index=0)
assert det.running
det.stop()
assert not det.running