From 59713ffc2282a897412a5ad23a6f47141c80ac4c Mon Sep 17 00:00:00 2001 From: James Smith Date: Sun, 3 May 2026 17:33:55 +0100 Subject: [PATCH] fix(drone): harden RFDetector threading, subprocess lifecycle, and frequency accuracy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace _running bool with threading.Event for correct cross-thread visibility - Add _proc_lock to guard _rtl_proc/_hackrf_proc across worker/main threads - Use register_process + safe_terminate (pipe close + SIGKILL fallback on timeout) - Compute HackRF frequency as band midpoint (hz_low+hz_high)//2, not hz_low - Guard start() for idempotency — double-call no longer leaks threads Co-Authored-By: Claude Sonnet 4.6 --- tests/test_drone_rf_detector.py | 2 +- utils/drone/rf_detector.py | 49 +++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/tests/test_drone_rf_detector.py b/tests/test_drone_rf_detector.py index 3742093..6ee654c 100644 --- a/tests/test_drone_rf_detector.py +++ b/tests/test_drone_rf_detector.py @@ -56,7 +56,7 @@ def test_hackrf_sweep_line_emits_observation(detector): obs = q.get_nowait() assert isinstance(obs, RFObservation) assert obs.hardware == "HACKRF" - assert obs.frequency_hz == hz_low + assert obs.frequency_hz == (hz_low + hz_high) // 2 assert obs.rssi < 0 diff --git a/utils/drone/rf_detector.py b/utils/drone/rf_detector.py index adbd818..eea689f 100644 --- a/utils/drone/rf_detector.py +++ b/utils/drone/rf_detector.py @@ -11,6 +11,8 @@ import subprocess import threading from datetime import datetime, timezone +from utils.process import register_process, safe_terminate + from .models import RFObservation from .signatures import match_signature @@ -32,14 +34,16 @@ def _in_drone_band(freq_hz: int) -> bool: class RFDetector: def __init__(self, output_queue: queue.Queue) -> None: self._queue = output_queue - self._running = False + self._stop_event = threading.Event() + self._stop_event.set() # starts in stopped state + self._proc_lock = threading.Lock() self._rtl_proc: subprocess.Popen | None = None self._hackrf_proc: subprocess.Popen | None = None self._threads: list[threading.Thread] = [] @property def running(self) -> bool: - return self._running + return not self._stop_event.is_set() def _handle_rtl433_line(self, line: str) -> None: try: @@ -71,6 +75,7 @@ class RFDetector: return try: hz_low = int(parts[2]) + hz_high = int(parts[3]) db_values = [float(p) for p in parts[6:] if p] except (ValueError, IndexError): return @@ -79,13 +84,14 @@ class RFDetector: avg_db = sum(db_values) / len(db_values) if avg_db < _HACKRF_THRESHOLD_DBM: return - if not _in_drone_band(hz_low): + freq_hz = (hz_low + hz_high) // 2 + if not _in_drone_band(freq_hz): return - protocol = match_signature(hz_low) + protocol = match_signature(freq_hz) with contextlib.suppress(queue.Full): self._queue.put_nowait( RFObservation( - frequency_hz=hz_low, + frequency_hz=freq_hz, protocol=protocol, rssi=avg_db, hardware="HACKRF", @@ -101,11 +107,14 @@ class RFDetector: cmd = [rtl_bin, "-d", str(device_index), "-F", "json", "-f", "433920000", "-f", "868300000"] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) - self._rtl_proc = proc + register_process(proc) + with self._proc_lock: + self._rtl_proc = proc for raw_line in iter(proc.stdout.readline, b""): - if not self._running: + if self._stop_event.is_set(): break self._handle_rtl433_line(raw_line.decode("utf-8", errors="replace").strip()) + safe_terminate(proc) except Exception as exc: logger.warning("rtl_433 error: %s", exc) @@ -117,16 +126,21 @@ class RFDetector: cmd = [hackrf_bin, "-f", "2400:2484", "-f", "5725:5875", "-w", "1000000"] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) - self._hackrf_proc = proc + register_process(proc) + with self._proc_lock: + self._hackrf_proc = proc for raw_line in iter(proc.stdout.readline, b""): - if not self._running: + if self._stop_event.is_set(): break self._handle_hackrf_line(raw_line.decode("utf-8", errors="replace").strip()) + safe_terminate(proc) except Exception as exc: logger.warning("hackrf_sweep error: %s", exc) def start(self, rtl_sdr_index: int = 0, use_hackrf: bool = True) -> None: - self._running = True + if self.running: + return + self._stop_event.clear() t1 = threading.Thread(target=self._run_rtl433, args=(rtl_sdr_index,), daemon=True) t1.start() self._threads.append(t1) @@ -136,11 +150,12 @@ class RFDetector: self._threads.append(t2) def stop(self) -> None: - self._running = False - for proc in (self._rtl_proc, self._hackrf_proc): - if proc: - with contextlib.suppress(Exception): - proc.terminate() - self._rtl_proc = None - self._hackrf_proc = None + self._stop_event.set() + with self._proc_lock: + rtl_proc = self._rtl_proc + hackrf_proc = self._hackrf_proc + self._rtl_proc = None + self._hackrf_proc = None + safe_terminate(rtl_proc) + safe_terminate(hackrf_proc) self._threads.clear()