From 1403d490499698168e6ac7f580fc853002259d0c Mon Sep 17 00:00:00 2001 From: Smittix Date: Thu, 5 Mar 2026 09:31:21 +0000 Subject: [PATCH] fix: restore HackRF One/Pro detection when PATH is restricted --- tests/test_sdr_detection.py | 45 +++++-- tests/test_subghz.py | 109 ++++++++-------- utils/sdr/detection.py | 146 ++++++++++++---------- utils/subghz.py | 240 +++++++++++++++++++----------------- 4 files changed, 296 insertions(+), 244 deletions(-) diff --git a/tests/test_sdr_detection.py b/tests/test_sdr_detection.py index 8c00e24..3a3f749 100644 --- a/tests/test_sdr_detection.py +++ b/tests/test_sdr_detection.py @@ -17,9 +17,9 @@ def _clear_detection_caches(): yield -@patch('utils.sdr.detection._check_tool', return_value=True) +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/rtl_test') @patch('utils.sdr.detection.subprocess.run') -def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_check_tool): +def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_tool_path): """Ignore malformed rtl_test rows that have an empty SN field.""" mock_result = MagicMock() mock_result.stdout = "" @@ -40,9 +40,9 @@ def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_chec assert devices[0].serial == "1" -@patch('utils.sdr.detection._check_tool', return_value=True) +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/rtl_test') @patch('utils.sdr.detection.subprocess.run') -def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_check_tool): +def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_tool_path): """Run rtl_test with tolerant decoding for malformed output bytes.""" mock_result = MagicMock() mock_result.stdout = "" @@ -74,9 +74,9 @@ HACKRF_INFO_OUTPUT = ( ) -@patch('utils.sdr.detection._check_tool', return_value=True) +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info') @patch('utils.sdr.detection.subprocess.run') -def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool): +def test_detect_hackrf_from_stdout(mock_run, _mock_tool_path): """Parse HackRF device info from stdout.""" mock_result = MagicMock() mock_result.stdout = HACKRF_INFO_OUTPUT @@ -92,9 +92,9 @@ def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool): assert devices[0].index == 0 -@patch('utils.sdr.detection._check_tool', return_value=True) +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info') @patch('utils.sdr.detection.subprocess.run') -def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool): +def test_detect_hackrf_from_stderr(mock_run, _mock_tool_path): """Parse HackRF device info when output goes to stderr (newer firmware).""" mock_result = MagicMock() mock_result.stdout = "" @@ -109,9 +109,9 @@ def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool): assert devices[0].serial == "0000000000000000a06063c8234e925f" -@patch('utils.sdr.detection._check_tool', return_value=True) +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info') @patch('utils.sdr.detection.subprocess.run') -def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool): +def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_tool_path): """Parse HackRF info even when hackrf_info exits non-zero (device busy).""" mock_result = MagicMock() mock_result.returncode = 1 @@ -125,9 +125,9 @@ def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool assert devices[0].name == "HackRF One" -@patch('utils.sdr.detection._check_tool', return_value=True) +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info') @patch('utils.sdr.detection.subprocess.run') -def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool): +def test_detect_hackrf_fallback_no_serial(mock_run, _mock_tool_path): """Fallback detection when serial is missing but 'Found HackRF' present.""" mock_result = MagicMock() mock_result.stdout = "Found HackRF\nBoard ID Number: 2 (HackRF One)\n" @@ -139,3 +139,24 @@ def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool): assert len(devices) == 1 assert devices[0].name == "HackRF One" assert devices[0].serial == "Unknown" + + +@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info') +@patch('utils.sdr.detection.subprocess.run') +def test_detect_hackrf_parses_legacy_serial_format(mock_run, _mock_tool_path): + """Accept legacy 'Serial Number' casing and spaced hex format.""" + mock_result = MagicMock() + mock_result.stdout = ( + "Found HackRF\n" + "Index: 0\n" + "Serial Number: 0x00000000 00000000 a06063c8 234e925f\n" + "Board ID Number: 3 (HackRF Pro)\n" + ) + mock_result.stderr = "" + mock_run.return_value = mock_result + + devices = detect_hackrf_devices() + + assert len(devices) == 1 + assert devices[0].name == "HackRF Pro" + assert devices[0].serial == "0000000000000000a06063c8234e925f" diff --git a/tests/test_subghz.py b/tests/test_subghz.py index 5de373f..2f52870 100644 --- a/tests/test_subghz.py +++ b/tests/test_subghz.py @@ -43,15 +43,16 @@ class TestSubGhzManagerInit: assert status['mode'] == 'idle' -class TestToolDetection: +class TestToolDetection: def test_check_hackrf_found(self, manager): with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'): assert manager.check_hackrf() is True - def test_check_hackrf_not_found(self, manager): - with patch('shutil.which', return_value=None): - manager._hackrf_available = None # reset cache - assert manager.check_hackrf() is False + def test_check_hackrf_not_found(self, manager): + with patch('shutil.which', return_value=None), \ + patch('utils.subghz.get_tool_path', return_value=None): + manager._hackrf_available = None # reset cache + assert manager.check_hackrf() is False def test_check_rtl433_found(self, manager): with patch('shutil.which', return_value='/usr/bin/rtl_433'): @@ -62,13 +63,14 @@ class TestToolDetection: assert manager.check_sweep() is True -class TestReceive: - def test_start_receive_no_hackrf(self, manager): - with patch('shutil.which', return_value=None): - manager._hackrf_available = None - result = manager.start_receive(frequency_hz=433920000) - assert result['status'] == 'error' - assert 'not found' in result['message'] +class TestReceive: + def test_start_receive_no_hackrf(self, manager): + with patch('shutil.which', return_value=None), \ + patch('utils.subghz.get_tool_path', return_value=None): + manager._hackrf_available = None + result = manager.start_receive(frequency_hz=433920000) + assert result['status'] == 'error' + assert 'not found' in result['message'] def test_start_receive_success(self, manager): mock_proc = MagicMock() @@ -164,11 +166,12 @@ class TestTxSafety: result = SubGhzManager.validate_tx_frequency(500000000) # 500 MHz assert result is not None - def test_transmit_no_hackrf(self, manager): - with patch('shutil.which', return_value=None): - manager._hackrf_available = None - result = manager.transmit(capture_id='abc123') - assert result['status'] == 'error' + def test_transmit_no_hackrf(self, manager): + with patch('shutil.which', return_value=None), \ + patch('utils.subghz.get_tool_path', return_value=None): + manager._hackrf_available = None + result = manager.transmit(capture_id='abc123') + assert result['status'] == 'error' def test_transmit_capture_not_found(self, manager): with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \ @@ -464,12 +467,13 @@ class TestCaptureLibrary: assert all(c.fingerprint_group_size == 2 for c in captures) -class TestSweep: - def test_start_sweep_no_tool(self, manager): - with patch('shutil.which', return_value=None): - manager._sweep_available = None - result = manager.start_sweep() - assert result['status'] == 'error' +class TestSweep: + def test_start_sweep_no_tool(self, manager): + with patch('shutil.which', return_value=None), \ + patch('utils.subghz.get_tool_path', return_value=None): + manager._sweep_available = None + result = manager.start_sweep() + assert result['status'] == 'error' def test_start_sweep_success(self, manager): import time as _time @@ -494,14 +498,15 @@ class TestSweep: assert result['status'] == 'not_running' -class TestDecode: - def test_start_decode_no_hackrf(self, manager): - with patch('shutil.which', return_value=None): - manager._hackrf_available = None - manager._rtl433_available = None - result = manager.start_decode(frequency_hz=433920000) - assert result['status'] == 'error' - assert 'hackrf_transfer' in result['message'] +class TestDecode: + def test_start_decode_no_hackrf(self, manager): + with patch('shutil.which', return_value=None), \ + patch('utils.subghz.get_tool_path', return_value=None): + manager._hackrf_available = None + manager._rtl433_available = None + result = manager.start_decode(frequency_hz=433920000) + assert result['status'] == 'error' + assert 'hackrf_transfer' in result['message'] def test_start_decode_no_rtl433(self, manager): def which_side_effect(name): @@ -509,12 +514,13 @@ class TestDecode: return '/usr/bin/hackrf_transfer' return None - with patch('shutil.which', side_effect=which_side_effect): - manager._hackrf_available = None - manager._rtl433_available = None - result = manager.start_decode(frequency_hz=433920000) - assert result['status'] == 'error' - assert 'rtl_433' in result['message'] + with patch('shutil.which', side_effect=which_side_effect), \ + patch('utils.subghz.get_tool_path', return_value=None): + manager._hackrf_available = None + manager._rtl433_available = None + result = manager.start_decode(frequency_hz=433920000) + assert result['status'] == 'error' + assert 'rtl_433' in result['message'] def test_start_decode_success(self, manager): mock_hackrf_proc = MagicMock() @@ -537,9 +543,12 @@ class TestDecode: return mock_hackrf_proc return mock_rtl433_proc - with patch('shutil.which', return_value='/usr/bin/tool'), \ - patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \ - patch('utils.subghz.register_process'): + def which_side_effect(name): + return f'/usr/bin/{name}' + + with patch('shutil.which', side_effect=which_side_effect), \ + patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \ + patch('utils.subghz.register_process'): import time as _time manager._hackrf_available = None manager._rtl433_available = None @@ -556,16 +565,16 @@ class TestDecode: # Two processes: hackrf_transfer + rtl_433 assert mock_popen.call_count == 2 - # Verify hackrf_transfer command - hackrf_cmd = mock_popen.call_args_list[0][0][0] - assert hackrf_cmd[0] == 'hackrf_transfer' - assert '-r' in hackrf_cmd - - # Verify rtl_433 command - rtl433_cmd = mock_popen.call_args_list[1][0][0] - assert rtl433_cmd[0] == 'rtl_433' - assert '-r' in rtl433_cmd - assert 'cs8:-' in rtl433_cmd + # Verify hackrf_transfer command + hackrf_cmd = mock_popen.call_args_list[0][0][0] + assert os.path.basename(hackrf_cmd[0]) == 'hackrf_transfer' + assert '-r' in hackrf_cmd + + # Verify rtl_433 command + rtl433_cmd = mock_popen.call_args_list[1][0][0] + assert os.path.basename(rtl433_cmd[0]) == 'rtl_433' + assert '-r' in rtl433_cmd + assert 'cs8:-' in rtl433_cmd # Both processes tracked assert manager._decode_hackrf_process is mock_hackrf_proc diff --git a/utils/sdr/detection.py b/utils/sdr/detection.py index 5f08725..c1fc0de 100644 --- a/utils/sdr/detection.py +++ b/utils/sdr/detection.py @@ -6,14 +6,15 @@ Detects RTL-SDR devices via rtl_test and other SDR hardware via SoapySDR. from __future__ import annotations -import logging -import re -import shutil -import subprocess -import time -from typing import Optional - -from .base import SDRCapabilities, SDRDevice, SDRType +import logging +import re +import subprocess +import time +from typing import Optional + +from utils.dependencies import get_tool_path + +from .base import SDRCapabilities, SDRDevice, SDRType logger = logging.getLogger(__name__) @@ -43,12 +44,7 @@ def _hackrf_probe_blocked() -> bool: return False -def _check_tool(name: str) -> bool: - """Check if a tool is available in PATH.""" - return shutil.which(name) is not None - - -def _get_capabilities_for_type(sdr_type: SDRType) -> SDRCapabilities: +def _get_capabilities_for_type(sdr_type: SDRType) -> SDRCapabilities: """Get default capabilities for an SDR type.""" # Import here to avoid circular imports from .rtlsdr import RTLSDRCommandBuilder @@ -100,7 +96,7 @@ def _driver_to_sdr_type(driver: str) -> Optional[SDRType]: return mapping.get(driver.lower()) -def detect_rtlsdr_devices() -> list[SDRDevice]: +def detect_rtlsdr_devices() -> list[SDRDevice]: """ Detect RTL-SDR devices using rtl_test. @@ -109,9 +105,10 @@ def detect_rtlsdr_devices() -> list[SDRDevice]: """ devices: list[SDRDevice] = [] - if not _check_tool('rtl_test'): - logger.debug("rtl_test not found, skipping RTL-SDR detection") - return devices + rtl_test_path = get_tool_path('rtl_test') + if not rtl_test_path: + logger.debug("rtl_test not found, skipping RTL-SDR detection") + return devices try: import os @@ -122,11 +119,11 @@ def detect_rtlsdr_devices() -> list[SDRDevice]: lib_paths = ['/usr/local/lib', '/opt/homebrew/lib'] current_ld = env.get('DYLD_LIBRARY_PATH', '') env['DYLD_LIBRARY_PATH'] = ':'.join(lib_paths + [current_ld] if current_ld else lib_paths) - result = subprocess.run( - ['rtl_test', '-t'], - capture_output=True, - text=True, - encoding='utf-8', + result = subprocess.run( + [rtl_test_path, '-t'], + capture_output=True, + text=True, + encoding='utf-8', errors='replace', timeout=5, env=env @@ -176,13 +173,14 @@ def detect_rtlsdr_devices() -> list[SDRDevice]: return devices -def _find_soapy_util() -> str | None: - """Find SoapySDR utility command (name varies by distribution).""" - # Try different command names used across distributions - for cmd in ['SoapySDRUtil', 'soapy_sdr_util', 'soapysdr-util']: - if _check_tool(cmd): - return cmd - return None +def _find_soapy_util() -> str | None: + """Find SoapySDR utility command (name varies by distribution).""" + # Try different command names used across distributions + for cmd in ['SoapySDRUtil', 'soapy_sdr_util', 'soapysdr-util']: + tool_path = get_tool_path(cmd) + if tool_path: + return tool_path + return None def _get_soapy_env() -> dict: @@ -324,7 +322,7 @@ def _add_soapy_device( )) -def detect_hackrf_devices() -> list[SDRDevice]: +def detect_hackrf_devices() -> list[SDRDevice]: """ Detect HackRF devices using native hackrf_info tool. @@ -343,33 +341,46 @@ def detect_hackrf_devices() -> list[SDRDevice]: devices: list[SDRDevice] = [] - if not _check_tool('hackrf_info'): - _hackrf_cache = devices - _hackrf_cache_ts = now - return devices + hackrf_info_path = get_tool_path('hackrf_info') + if not hackrf_info_path: + _hackrf_cache = devices + _hackrf_cache_ts = now + return devices try: - result = subprocess.run( - ['hackrf_info'], - capture_output=True, - text=True, - timeout=5 - ) + result = subprocess.run( + [hackrf_info_path], + capture_output=True, + text=True, + timeout=5 + ) # Combine stdout + stderr: newer firmware may print to stderr, # and hackrf_info may exit non-zero when device is briefly busy # but still output valid info. - output = result.stdout + result.stderr - - # Parse hackrf_info output - # Extract board name from "Board ID Number: X (Name)" and serial - from .hackrf import HackRFCommandBuilder - - serial_pattern = r'Serial number:\s*(\S+)' - board_pattern = r'Board ID Number:\s*\d+\s*\(([^)]+)\)' - - serials_found = re.findall(serial_pattern, output) - boards_found = re.findall(board_pattern, output) + output = f"{result.stdout or ''}\n{result.stderr or ''}" + + # Parse hackrf_info output + # Extract board name from "Board ID Number: X (Name)" and serial + from .hackrf import HackRFCommandBuilder + + serial_pattern = re.compile( + r'^\s*Serial\s+number:\s*(.+)$', + re.IGNORECASE | re.MULTILINE, + ) + board_pattern = re.compile( + r'Board\s+ID\s+Number:\s*\d+\s*\(([^)]+)\)', + re.IGNORECASE, + ) + + serials_found = [] + for raw in serial_pattern.findall(output): + # Normalise legacy formats like "0x1234 5678" to plain hex. + serial = re.sub(r'0x', '', raw, flags=re.IGNORECASE) + serial = re.sub(r'[^0-9A-Fa-f]', '', serial) + if serial: + serials_found.append(serial) + boards_found = board_pattern.findall(output) for i, serial in enumerate(serials_found): board_name = boards_found[i] if i < len(boards_found) else 'HackRF' @@ -383,11 +394,11 @@ def detect_hackrf_devices() -> list[SDRDevice]: )) # Fallback: check if any HackRF found without serial - if not devices and 'Found HackRF' in output: - board_match = re.search(board_pattern, output) - board_name = board_match.group(1) if board_match else 'HackRF' - devices.append(SDRDevice( - sdr_type=SDRType.HACKRF, + if not devices and re.search(r'Found\s+HackRF', output, re.IGNORECASE): + board_match = board_pattern.search(output) + board_name = board_match.group(1) if board_match else 'HackRF' + devices.append(SDRDevice( + sdr_type=SDRType.HACKRF, index=0, name=board_name, serial='Unknown', @@ -403,7 +414,7 @@ def detect_hackrf_devices() -> list[SDRDevice]: return devices -def probe_rtlsdr_device(device_index: int) -> str | None: +def probe_rtlsdr_device(device_index: int) -> str | None: """Probe whether an RTL-SDR device is available at the USB level. Runs a quick ``rtl_test`` invocation targeting a single device to @@ -417,10 +428,11 @@ def probe_rtlsdr_device(device_index: int) -> str | None: An error message string if the device cannot be opened, or ``None`` if the device is available. """ - if not _check_tool('rtl_test'): - # Can't probe without rtl_test — let the caller proceed and - # surface errors from the actual decoder process instead. - return None + rtl_test_path = get_tool_path('rtl_test') + if not rtl_test_path: + # Can't probe without rtl_test — let the caller proceed and + # surface errors from the actual decoder process instead. + return None try: import os @@ -437,11 +449,11 @@ def probe_rtlsdr_device(device_index: int) -> str | None: # Use Popen with early termination instead of run() with full timeout. # rtl_test prints device info to stderr quickly, then keeps running # its test loop. We kill it as soon as we see success or failure. - proc = subprocess.Popen( - ['rtl_test', '-d', str(device_index), '-t'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, + proc = subprocess.Popen( + [rtl_test_path, '-d', str(device_index), '-t'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, env=env, ) diff --git a/utils/subghz.py b/utils/subghz.py index 99686d9..422cc95 100644 --- a/utils/subghz.py +++ b/utils/subghz.py @@ -21,11 +21,12 @@ from datetime import datetime, timezone from pathlib import Path from typing import BinaryIO, Callable -import numpy as np - -from utils.logging import get_logger -from utils.process import register_process, safe_terminate, unregister_process -from utils.constants import ( +import numpy as np + +from utils.dependencies import get_tool_path +from utils.logging import get_logger +from utils.process import register_process, safe_terminate, unregister_process +from utils.constants import ( SUBGHZ_TX_ALLOWED_BANDS, SUBGHZ_FREQ_MIN_MHZ, SUBGHZ_FREQ_MAX_MHZ, @@ -187,19 +188,23 @@ class SubGhzManager: except Exception as e: logger.error(f"Error in SubGHz callback: {e}") - # ------------------------------------------------------------------ - # Tool detection - # ------------------------------------------------------------------ - - def check_hackrf(self) -> bool: - if self._hackrf_available is None: - self._hackrf_available = shutil.which('hackrf_transfer') is not None - return self._hackrf_available - - def check_hackrf_info(self) -> bool: - if self._hackrf_info_available is None: - self._hackrf_info_available = shutil.which('hackrf_info') is not None - return self._hackrf_info_available + # ------------------------------------------------------------------ + # Tool detection + # ------------------------------------------------------------------ + + def _resolve_tool(self, name: str) -> str | None: + """Resolve executable path via PATH first, then platform-aware fallbacks.""" + return shutil.which(name) or get_tool_path(name) + + def check_hackrf(self) -> bool: + if self._hackrf_available is None: + self._hackrf_available = self._resolve_tool('hackrf_transfer') is not None + return self._hackrf_available + + def check_hackrf_info(self) -> bool: + if self._hackrf_info_available is None: + self._hackrf_info_available = self._resolve_tool('hackrf_info') is not None + return self._hackrf_info_available def check_hackrf_device(self) -> bool | None: """Return True if a HackRF device is detected, False if not, or None if detection unavailable.""" @@ -228,15 +233,15 @@ class SubGhzManager: return 'HackRF device not detected' return None - def check_rtl433(self) -> bool: - if self._rtl433_available is None: - self._rtl433_available = shutil.which('rtl_433') is not None - return self._rtl433_available - - def check_sweep(self) -> bool: - if self._sweep_available is None: - self._sweep_available = shutil.which('hackrf_sweep') is not None - return self._sweep_available + def check_rtl433(self) -> bool: + if self._rtl433_available is None: + self._rtl433_available = self._resolve_tool('rtl_433') is not None + return self._rtl433_available + + def check_sweep(self) -> bool: + if self._sweep_available is None: + self._sweep_available = self._resolve_tool('hackrf_sweep') is not None + return self._sweep_available # ------------------------------------------------------------------ # Status @@ -307,23 +312,24 @@ class SubGhzManager: # RECEIVE (IQ capture via hackrf_transfer -r) # ------------------------------------------------------------------ - def start_receive( - self, - frequency_hz: int, - sample_rate: int = 2000000, + def start_receive( + self, + frequency_hz: int, + sample_rate: int = 2000000, lna_gain: int = 32, vga_gain: int = 20, trigger_enabled: bool = False, trigger_pre_ms: int = 350, - trigger_post_ms: int = 700, - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - if not self.check_hackrf(): - return {'status': 'error', 'message': 'hackrf_transfer not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + trigger_post_ms: int = 700, + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_transfer_path = self._resolve_tool('hackrf_transfer') + if not hackrf_transfer_path: + return {'status': 'error', 'message': 'hackrf_transfer not found'} + device_err = self._require_hackrf_device() + if device_err: + return {'status': 'error', 'message': device_err} with self._lock: if self.active_mode != 'idle': @@ -339,11 +345,11 @@ class SubGhzManager: basename = f"{freq_mhz:.3f}MHz_{ts}" iq_file = self._captures_dir / f"{basename}.iq" - cmd = [ - 'hackrf_transfer', - '-r', str(iq_file), - '-f', str(frequency_hz), - '-s', str(sample_rate), + cmd = [ + hackrf_transfer_path, + '-r', str(iq_file), + '-f', str(frequency_hz), + '-s', str(sample_rate), '-l', str(lna_gain), '-g', str(vga_gain), ] @@ -1272,23 +1278,25 @@ class SubGhzManager: # DECODE (hackrf_transfer piped to rtl_433) # ------------------------------------------------------------------ - def start_decode( - self, - frequency_hz: int, - sample_rate: int = 2_000_000, + def start_decode( + self, + frequency_hz: int, + sample_rate: int = 2_000_000, lna_gain: int = 32, vga_gain: int = 20, - decode_profile: str = 'weather', - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - if not self.check_hackrf(): - return {'status': 'error', 'message': 'hackrf_transfer not found'} - if not self.check_rtl433(): - return {'status': 'error', 'message': 'rtl_433 not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + decode_profile: str = 'weather', + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_transfer_path = self._resolve_tool('hackrf_transfer') + if not hackrf_transfer_path: + return {'status': 'error', 'message': 'hackrf_transfer not found'} + rtl433_path = self._resolve_tool('rtl_433') + if not rtl433_path: + return {'status': 'error', 'message': 'rtl_433 not found'} + device_err = self._require_hackrf_device() + if device_err: + return {'status': 'error', 'message': device_err} with self._lock: if self.active_mode != 'idle': @@ -1299,25 +1307,25 @@ class SubGhzManager: requested_sample_rate = int(sample_rate) stable_sample_rate = max(2_000_000, min(2_000_000, requested_sample_rate)) - # Build hackrf_transfer command (producer: raw IQ to stdout) - hackrf_cmd = [ - 'hackrf_transfer', - '-r', '-', - '-f', str(frequency_hz), - '-s', str(stable_sample_rate), + # Build hackrf_transfer command (producer: raw IQ to stdout) + hackrf_cmd = [ + hackrf_transfer_path, + '-r', '-', + '-f', str(frequency_hz), + '-s', str(stable_sample_rate), '-l', str(max(SUBGHZ_LNA_GAIN_MIN, min(SUBGHZ_LNA_GAIN_MAX, lna_gain))), '-g', str(max(SUBGHZ_VGA_GAIN_MIN, min(SUBGHZ_VGA_GAIN_MAX, vga_gain))), ] if device_serial: hackrf_cmd.extend(['-d', device_serial]) - # Build rtl_433 command (consumer: reads IQ from stdin) - # Feed signed 8-bit complex IQ directly from hackrf_transfer. - rtl433_cmd = [ - 'rtl_433', - '-r', 'cs8:-', - '-s', str(stable_sample_rate), - '-f', str(frequency_hz), + # Build rtl_433 command (consumer: reads IQ from stdin) + # Feed signed 8-bit complex IQ directly from hackrf_transfer. + rtl433_cmd = [ + rtl433_path, + '-r', 'cs8:-', + '-s', str(stable_sample_rate), + '-f', str(frequency_hz), '-F', 'json', '-F', 'log', '-M', 'level', @@ -1936,21 +1944,22 @@ class SubGhzManager: except OSError as exc: logger.debug(f"Failed to remove TX temp file {path}: {exc}") - def transmit( - self, - capture_id: str, - tx_gain: int = 20, + def transmit( + self, + capture_id: str, + tx_gain: int = 20, max_duration: int = 10, start_seconds: float | None = None, - duration_seconds: float | None = None, - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - if not self.check_hackrf(): - return {'status': 'error', 'message': 'hackrf_transfer not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + duration_seconds: float | None = None, + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_transfer_path = self._resolve_tool('hackrf_transfer') + if not hackrf_transfer_path: + return {'status': 'error', 'message': 'hackrf_transfer not found'} + device_err = self._require_hackrf_device() + if device_err: + return {'status': 'error', 'message': device_err} # Pre-lock: capture lookup, validation, and segment I/O (can be large) capture = self._load_capture(capture_id) @@ -2046,14 +2055,14 @@ class SubGhzManager: # Clear any orphaned temp segment from a previous TX attempt. self._cleanup_tx_temp_file() - if segment_path_for_cleanup: - self._tx_temp_file = segment_path_for_cleanup - - cmd = [ - 'hackrf_transfer', - '-t', str(tx_path), - '-f', str(capture.frequency_hz), - '-s', str(capture.sample_rate), + if segment_path_for_cleanup: + self._tx_temp_file = segment_path_for_cleanup + + cmd = [ + hackrf_transfer_path, + '-t', str(tx_path), + '-f', str(capture.frequency_hz), + '-s', str(capture.sample_rate), '-x', str(tx_gain), ] if device_serial: @@ -2183,19 +2192,20 @@ class SubGhzManager: # SWEEP (hackrf_sweep) # ------------------------------------------------------------------ - def start_sweep( - self, - freq_start_mhz: float = 300.0, - freq_end_mhz: float = 928.0, - bin_width: int = 100000, - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - if not self.check_sweep(): - return {'status': 'error', 'message': 'hackrf_sweep not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + def start_sweep( + self, + freq_start_mhz: float = 300.0, + freq_end_mhz: float = 928.0, + bin_width: int = 100000, + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_sweep_path = self._resolve_tool('hackrf_sweep') + if not hackrf_sweep_path: + return {'status': 'error', 'message': 'hackrf_sweep not found'} + device_err = self._require_hackrf_device() + if device_err: + return {'status': 'error', 'message': device_err} # Wait for previous sweep thread to exit (blocking) before lock if self._sweep_thread and self._sweep_thread.is_alive(): @@ -2204,14 +2214,14 @@ class SubGhzManager: return {'status': 'error', 'message': 'Previous sweep still shutting down'} with self._lock: - if self.active_mode != 'idle': - return {'status': 'error', 'message': f'Already running: {self.active_mode}'} - - cmd = [ - 'hackrf_sweep', - '-f', f'{int(freq_start_mhz)}:{int(freq_end_mhz)}', - '-w', str(bin_width), - ] + if self.active_mode != 'idle': + return {'status': 'error', 'message': f'Already running: {self.active_mode}'} + + cmd = [ + hackrf_sweep_path, + '-f', f'{int(freq_start_mhz)}:{int(freq_end_mhz)}', + '-w', str(bin_width), + ] if device_serial: cmd.extend(['-d', device_serial])