fix(drone): harden RFDetector threading, subprocess lifecycle, and frequency accuracy

- Replace _running bool with threading.Event for correct cross-thread visibility
- Add _proc_lock to guard _rtl_proc/_hackrf_proc across worker/main threads
- Use register_process + safe_terminate (pipe close + SIGKILL fallback on timeout)
- Compute HackRF frequency as band midpoint (hz_low+hz_high)//2, not hz_low
- Guard start() for idempotency — double-call no longer leaks threads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-05-03 17:33:55 +01:00
parent 681a498461
commit 59713ffc22
2 changed files with 33 additions and 18 deletions
+1 -1
View File
@@ -56,7 +56,7 @@ def test_hackrf_sweep_line_emits_observation(detector):
obs = q.get_nowait()
assert isinstance(obs, RFObservation)
assert obs.hardware == "HACKRF"
assert obs.frequency_hz == hz_low
assert obs.frequency_hz == (hz_low + hz_high) // 2
assert obs.rssi < 0
+32 -17
View File
@@ -11,6 +11,8 @@ import subprocess
import threading
from datetime import datetime, timezone
from utils.process import register_process, safe_terminate
from .models import RFObservation
from .signatures import match_signature
@@ -32,14 +34,16 @@ def _in_drone_band(freq_hz: int) -> bool:
class RFDetector:
def __init__(self, output_queue: queue.Queue) -> None:
self._queue = output_queue
self._running = False
self._stop_event = threading.Event()
self._stop_event.set() # starts in stopped state
self._proc_lock = threading.Lock()
self._rtl_proc: subprocess.Popen | None = None
self._hackrf_proc: subprocess.Popen | None = None
self._threads: list[threading.Thread] = []
@property
def running(self) -> bool:
return self._running
return not self._stop_event.is_set()
def _handle_rtl433_line(self, line: str) -> None:
try:
@@ -71,6 +75,7 @@ class RFDetector:
return
try:
hz_low = int(parts[2])
hz_high = int(parts[3])
db_values = [float(p) for p in parts[6:] if p]
except (ValueError, IndexError):
return
@@ -79,13 +84,14 @@ class RFDetector:
avg_db = sum(db_values) / len(db_values)
if avg_db < _HACKRF_THRESHOLD_DBM:
return
if not _in_drone_band(hz_low):
freq_hz = (hz_low + hz_high) // 2
if not _in_drone_band(freq_hz):
return
protocol = match_signature(hz_low)
protocol = match_signature(freq_hz)
with contextlib.suppress(queue.Full):
self._queue.put_nowait(
RFObservation(
frequency_hz=hz_low,
frequency_hz=freq_hz,
protocol=protocol,
rssi=avg_db,
hardware="HACKRF",
@@ -101,11 +107,14 @@ class RFDetector:
cmd = [rtl_bin, "-d", str(device_index), "-F", "json", "-f", "433920000", "-f", "868300000"]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
self._rtl_proc = proc
register_process(proc)
with self._proc_lock:
self._rtl_proc = proc
for raw_line in iter(proc.stdout.readline, b""):
if not self._running:
if self._stop_event.is_set():
break
self._handle_rtl433_line(raw_line.decode("utf-8", errors="replace").strip())
safe_terminate(proc)
except Exception as exc:
logger.warning("rtl_433 error: %s", exc)
@@ -117,16 +126,21 @@ class RFDetector:
cmd = [hackrf_bin, "-f", "2400:2484", "-f", "5725:5875", "-w", "1000000"]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
self._hackrf_proc = proc
register_process(proc)
with self._proc_lock:
self._hackrf_proc = proc
for raw_line in iter(proc.stdout.readline, b""):
if not self._running:
if self._stop_event.is_set():
break
self._handle_hackrf_line(raw_line.decode("utf-8", errors="replace").strip())
safe_terminate(proc)
except Exception as exc:
logger.warning("hackrf_sweep error: %s", exc)
def start(self, rtl_sdr_index: int = 0, use_hackrf: bool = True) -> None:
self._running = True
if self.running:
return
self._stop_event.clear()
t1 = threading.Thread(target=self._run_rtl433, args=(rtl_sdr_index,), daemon=True)
t1.start()
self._threads.append(t1)
@@ -136,11 +150,12 @@ class RFDetector:
self._threads.append(t2)
def stop(self) -> None:
self._running = False
for proc in (self._rtl_proc, self._hackrf_proc):
if proc:
with contextlib.suppress(Exception):
proc.terminate()
self._rtl_proc = None
self._hackrf_proc = None
self._stop_event.set()
with self._proc_lock:
rtl_proc = self._rtl_proc
hackrf_proc = self._hackrf_proc
self._rtl_proc = None
self._hackrf_proc = None
safe_terminate(rtl_proc)
safe_terminate(hackrf_proc)
self._threads.clear()