mirror of
https://github.com/smittix/intercept.git
synced 2026-06-08 14:11:54 -07:00
fix: restore HackRF One/Pro detection when PATH is restricted
This commit is contained in:
+33
-12
@@ -17,9 +17,9 @@ def _clear_detection_caches():
|
||||
yield
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/rtl_test')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_check_tool):
|
||||
def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_tool_path):
|
||||
"""Ignore malformed rtl_test rows that have an empty SN field."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
@@ -40,9 +40,9 @@ def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_chec
|
||||
assert devices[0].serial == "1"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/rtl_test')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_check_tool):
|
||||
def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_tool_path):
|
||||
"""Run rtl_test with tolerant decoding for malformed output bytes."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
@@ -74,9 +74,9 @@ HACKRF_INFO_OUTPUT = (
|
||||
)
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_from_stdout(mock_run, _mock_tool_path):
|
||||
"""Parse HackRF device info from stdout."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = HACKRF_INFO_OUTPUT
|
||||
@@ -92,9 +92,9 @@ def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool):
|
||||
assert devices[0].index == 0
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_from_stderr(mock_run, _mock_tool_path):
|
||||
"""Parse HackRF device info when output goes to stderr (newer firmware)."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
@@ -109,9 +109,9 @@ def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool):
|
||||
assert devices[0].serial == "0000000000000000a06063c8234e925f"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_tool_path):
|
||||
"""Parse HackRF info even when hackrf_info exits non-zero (device busy)."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.returncode = 1
|
||||
@@ -125,9 +125,9 @@ def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool
|
||||
assert devices[0].name == "HackRF One"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_fallback_no_serial(mock_run, _mock_tool_path):
|
||||
"""Fallback detection when serial is missing but 'Found HackRF' present."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = "Found HackRF\nBoard ID Number: 2 (HackRF One)\n"
|
||||
@@ -139,3 +139,24 @@ def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool):
|
||||
assert len(devices) == 1
|
||||
assert devices[0].name == "HackRF One"
|
||||
assert devices[0].serial == "Unknown"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_parses_legacy_serial_format(mock_run, _mock_tool_path):
|
||||
"""Accept legacy 'Serial Number' casing and spaced hex format."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = (
|
||||
"Found HackRF\n"
|
||||
"Index: 0\n"
|
||||
"Serial Number: 0x00000000 00000000 a06063c8 234e925f\n"
|
||||
"Board ID Number: 3 (HackRF Pro)\n"
|
||||
)
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
devices = detect_hackrf_devices()
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].name == "HackRF Pro"
|
||||
assert devices[0].serial == "0000000000000000a06063c8234e925f"
|
||||
|
||||
+59
-50
@@ -43,15 +43,16 @@ class TestSubGhzManagerInit:
|
||||
assert status['mode'] == 'idle'
|
||||
|
||||
|
||||
class TestToolDetection:
|
||||
class TestToolDetection:
|
||||
def test_check_hackrf_found(self, manager):
|
||||
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'):
|
||||
assert manager.check_hackrf() is True
|
||||
|
||||
def test_check_hackrf_not_found(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None # reset cache
|
||||
assert manager.check_hackrf() is False
|
||||
def test_check_hackrf_not_found(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None # reset cache
|
||||
assert manager.check_hackrf() is False
|
||||
|
||||
def test_check_rtl433_found(self, manager):
|
||||
with patch('shutil.which', return_value='/usr/bin/rtl_433'):
|
||||
@@ -62,13 +63,14 @@ class TestToolDetection:
|
||||
assert manager.check_sweep() is True
|
||||
|
||||
|
||||
class TestReceive:
|
||||
def test_start_receive_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.start_receive(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'not found' in result['message']
|
||||
class TestReceive:
|
||||
def test_start_receive_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.start_receive(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'not found' in result['message']
|
||||
|
||||
def test_start_receive_success(self, manager):
|
||||
mock_proc = MagicMock()
|
||||
@@ -164,11 +166,12 @@ class TestTxSafety:
|
||||
result = SubGhzManager.validate_tx_frequency(500000000) # 500 MHz
|
||||
assert result is not None
|
||||
|
||||
def test_transmit_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.transmit(capture_id='abc123')
|
||||
assert result['status'] == 'error'
|
||||
def test_transmit_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.transmit(capture_id='abc123')
|
||||
assert result['status'] == 'error'
|
||||
|
||||
def test_transmit_capture_not_found(self, manager):
|
||||
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
|
||||
@@ -464,12 +467,13 @@ class TestCaptureLibrary:
|
||||
assert all(c.fingerprint_group_size == 2 for c in captures)
|
||||
|
||||
|
||||
class TestSweep:
|
||||
def test_start_sweep_no_tool(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._sweep_available = None
|
||||
result = manager.start_sweep()
|
||||
assert result['status'] == 'error'
|
||||
class TestSweep:
|
||||
def test_start_sweep_no_tool(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._sweep_available = None
|
||||
result = manager.start_sweep()
|
||||
assert result['status'] == 'error'
|
||||
|
||||
def test_start_sweep_success(self, manager):
|
||||
import time as _time
|
||||
@@ -494,14 +498,15 @@ class TestSweep:
|
||||
assert result['status'] == 'not_running'
|
||||
|
||||
|
||||
class TestDecode:
|
||||
def test_start_decode_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'hackrf_transfer' in result['message']
|
||||
class TestDecode:
|
||||
def test_start_decode_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'hackrf_transfer' in result['message']
|
||||
|
||||
def test_start_decode_no_rtl433(self, manager):
|
||||
def which_side_effect(name):
|
||||
@@ -509,12 +514,13 @@ class TestDecode:
|
||||
return '/usr/bin/hackrf_transfer'
|
||||
return None
|
||||
|
||||
with patch('shutil.which', side_effect=which_side_effect):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'rtl_433' in result['message']
|
||||
with patch('shutil.which', side_effect=which_side_effect), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'rtl_433' in result['message']
|
||||
|
||||
def test_start_decode_success(self, manager):
|
||||
mock_hackrf_proc = MagicMock()
|
||||
@@ -537,9 +543,12 @@ class TestDecode:
|
||||
return mock_hackrf_proc
|
||||
return mock_rtl433_proc
|
||||
|
||||
with patch('shutil.which', return_value='/usr/bin/tool'), \
|
||||
patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \
|
||||
patch('utils.subghz.register_process'):
|
||||
def which_side_effect(name):
|
||||
return f'/usr/bin/{name}'
|
||||
|
||||
with patch('shutil.which', side_effect=which_side_effect), \
|
||||
patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \
|
||||
patch('utils.subghz.register_process'):
|
||||
import time as _time
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
@@ -556,16 +565,16 @@ class TestDecode:
|
||||
# Two processes: hackrf_transfer + rtl_433
|
||||
assert mock_popen.call_count == 2
|
||||
|
||||
# Verify hackrf_transfer command
|
||||
hackrf_cmd = mock_popen.call_args_list[0][0][0]
|
||||
assert hackrf_cmd[0] == 'hackrf_transfer'
|
||||
assert '-r' in hackrf_cmd
|
||||
|
||||
# Verify rtl_433 command
|
||||
rtl433_cmd = mock_popen.call_args_list[1][0][0]
|
||||
assert rtl433_cmd[0] == 'rtl_433'
|
||||
assert '-r' in rtl433_cmd
|
||||
assert 'cs8:-' in rtl433_cmd
|
||||
# Verify hackrf_transfer command
|
||||
hackrf_cmd = mock_popen.call_args_list[0][0][0]
|
||||
assert os.path.basename(hackrf_cmd[0]) == 'hackrf_transfer'
|
||||
assert '-r' in hackrf_cmd
|
||||
|
||||
# Verify rtl_433 command
|
||||
rtl433_cmd = mock_popen.call_args_list[1][0][0]
|
||||
assert os.path.basename(rtl433_cmd[0]) == 'rtl_433'
|
||||
assert '-r' in rtl433_cmd
|
||||
assert 'cs8:-' in rtl433_cmd
|
||||
|
||||
# Both processes tracked
|
||||
assert manager._decode_hackrf_process is mock_hackrf_proc
|
||||
|
||||
+79
-67
@@ -6,14 +6,15 @@ Detects RTL-SDR devices via rtl_test and other SDR hardware via SoapySDR.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from .base import SDRCapabilities, SDRDevice, SDRType
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from utils.dependencies import get_tool_path
|
||||
|
||||
from .base import SDRCapabilities, SDRDevice, SDRType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -43,12 +44,7 @@ def _hackrf_probe_blocked() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _check_tool(name: str) -> bool:
|
||||
"""Check if a tool is available in PATH."""
|
||||
return shutil.which(name) is not None
|
||||
|
||||
|
||||
def _get_capabilities_for_type(sdr_type: SDRType) -> SDRCapabilities:
|
||||
def _get_capabilities_for_type(sdr_type: SDRType) -> SDRCapabilities:
|
||||
"""Get default capabilities for an SDR type."""
|
||||
# Import here to avoid circular imports
|
||||
from .rtlsdr import RTLSDRCommandBuilder
|
||||
@@ -100,7 +96,7 @@ def _driver_to_sdr_type(driver: str) -> Optional[SDRType]:
|
||||
return mapping.get(driver.lower())
|
||||
|
||||
|
||||
def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
"""
|
||||
Detect RTL-SDR devices using rtl_test.
|
||||
|
||||
@@ -109,9 +105,10 @@ def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
"""
|
||||
devices: list[SDRDevice] = []
|
||||
|
||||
if not _check_tool('rtl_test'):
|
||||
logger.debug("rtl_test not found, skipping RTL-SDR detection")
|
||||
return devices
|
||||
rtl_test_path = get_tool_path('rtl_test')
|
||||
if not rtl_test_path:
|
||||
logger.debug("rtl_test not found, skipping RTL-SDR detection")
|
||||
return devices
|
||||
|
||||
try:
|
||||
import os
|
||||
@@ -122,11 +119,11 @@ def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
lib_paths = ['/usr/local/lib', '/opt/homebrew/lib']
|
||||
current_ld = env.get('DYLD_LIBRARY_PATH', '')
|
||||
env['DYLD_LIBRARY_PATH'] = ':'.join(lib_paths + [current_ld] if current_ld else lib_paths)
|
||||
result = subprocess.run(
|
||||
['rtl_test', '-t'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
result = subprocess.run(
|
||||
[rtl_test_path, '-t'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
errors='replace',
|
||||
timeout=5,
|
||||
env=env
|
||||
@@ -176,13 +173,14 @@ def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
return devices
|
||||
|
||||
|
||||
def _find_soapy_util() -> str | None:
|
||||
"""Find SoapySDR utility command (name varies by distribution)."""
|
||||
# Try different command names used across distributions
|
||||
for cmd in ['SoapySDRUtil', 'soapy_sdr_util', 'soapysdr-util']:
|
||||
if _check_tool(cmd):
|
||||
return cmd
|
||||
return None
|
||||
def _find_soapy_util() -> str | None:
|
||||
"""Find SoapySDR utility command (name varies by distribution)."""
|
||||
# Try different command names used across distributions
|
||||
for cmd in ['SoapySDRUtil', 'soapy_sdr_util', 'soapysdr-util']:
|
||||
tool_path = get_tool_path(cmd)
|
||||
if tool_path:
|
||||
return tool_path
|
||||
return None
|
||||
|
||||
|
||||
def _get_soapy_env() -> dict:
|
||||
@@ -324,7 +322,7 @@ def _add_soapy_device(
|
||||
))
|
||||
|
||||
|
||||
def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
"""
|
||||
Detect HackRF devices using native hackrf_info tool.
|
||||
|
||||
@@ -343,33 +341,46 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
|
||||
devices: list[SDRDevice] = []
|
||||
|
||||
if not _check_tool('hackrf_info'):
|
||||
_hackrf_cache = devices
|
||||
_hackrf_cache_ts = now
|
||||
return devices
|
||||
hackrf_info_path = get_tool_path('hackrf_info')
|
||||
if not hackrf_info_path:
|
||||
_hackrf_cache = devices
|
||||
_hackrf_cache_ts = now
|
||||
return devices
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['hackrf_info'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
result = subprocess.run(
|
||||
[hackrf_info_path],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Combine stdout + stderr: newer firmware may print to stderr,
|
||||
# and hackrf_info may exit non-zero when device is briefly busy
|
||||
# but still output valid info.
|
||||
output = result.stdout + result.stderr
|
||||
|
||||
# Parse hackrf_info output
|
||||
# Extract board name from "Board ID Number: X (Name)" and serial
|
||||
from .hackrf import HackRFCommandBuilder
|
||||
|
||||
serial_pattern = r'Serial number:\s*(\S+)'
|
||||
board_pattern = r'Board ID Number:\s*\d+\s*\(([^)]+)\)'
|
||||
|
||||
serials_found = re.findall(serial_pattern, output)
|
||||
boards_found = re.findall(board_pattern, output)
|
||||
output = f"{result.stdout or ''}\n{result.stderr or ''}"
|
||||
|
||||
# Parse hackrf_info output
|
||||
# Extract board name from "Board ID Number: X (Name)" and serial
|
||||
from .hackrf import HackRFCommandBuilder
|
||||
|
||||
serial_pattern = re.compile(
|
||||
r'^\s*Serial\s+number:\s*(.+)$',
|
||||
re.IGNORECASE | re.MULTILINE,
|
||||
)
|
||||
board_pattern = re.compile(
|
||||
r'Board\s+ID\s+Number:\s*\d+\s*\(([^)]+)\)',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
serials_found = []
|
||||
for raw in serial_pattern.findall(output):
|
||||
# Normalise legacy formats like "0x1234 5678" to plain hex.
|
||||
serial = re.sub(r'0x', '', raw, flags=re.IGNORECASE)
|
||||
serial = re.sub(r'[^0-9A-Fa-f]', '', serial)
|
||||
if serial:
|
||||
serials_found.append(serial)
|
||||
boards_found = board_pattern.findall(output)
|
||||
|
||||
for i, serial in enumerate(serials_found):
|
||||
board_name = boards_found[i] if i < len(boards_found) else 'HackRF'
|
||||
@@ -383,11 +394,11 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
))
|
||||
|
||||
# Fallback: check if any HackRF found without serial
|
||||
if not devices and 'Found HackRF' in output:
|
||||
board_match = re.search(board_pattern, output)
|
||||
board_name = board_match.group(1) if board_match else 'HackRF'
|
||||
devices.append(SDRDevice(
|
||||
sdr_type=SDRType.HACKRF,
|
||||
if not devices and re.search(r'Found\s+HackRF', output, re.IGNORECASE):
|
||||
board_match = board_pattern.search(output)
|
||||
board_name = board_match.group(1) if board_match else 'HackRF'
|
||||
devices.append(SDRDevice(
|
||||
sdr_type=SDRType.HACKRF,
|
||||
index=0,
|
||||
name=board_name,
|
||||
serial='Unknown',
|
||||
@@ -403,7 +414,7 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
return devices
|
||||
|
||||
|
||||
def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
"""Probe whether an RTL-SDR device is available at the USB level.
|
||||
|
||||
Runs a quick ``rtl_test`` invocation targeting a single device to
|
||||
@@ -417,10 +428,11 @@ def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
An error message string if the device cannot be opened,
|
||||
or ``None`` if the device is available.
|
||||
"""
|
||||
if not _check_tool('rtl_test'):
|
||||
# Can't probe without rtl_test — let the caller proceed and
|
||||
# surface errors from the actual decoder process instead.
|
||||
return None
|
||||
rtl_test_path = get_tool_path('rtl_test')
|
||||
if not rtl_test_path:
|
||||
# Can't probe without rtl_test — let the caller proceed and
|
||||
# surface errors from the actual decoder process instead.
|
||||
return None
|
||||
|
||||
try:
|
||||
import os
|
||||
@@ -437,11 +449,11 @@ def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
# Use Popen with early termination instead of run() with full timeout.
|
||||
# rtl_test prints device info to stderr quickly, then keeps running
|
||||
# its test loop. We kill it as soon as we see success or failure.
|
||||
proc = subprocess.Popen(
|
||||
['rtl_test', '-d', str(device_index), '-t'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
proc = subprocess.Popen(
|
||||
[rtl_test_path, '-d', str(device_index), '-t'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
env=env,
|
||||
)
|
||||
|
||||
|
||||
+125
-115
@@ -21,11 +21,12 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import BinaryIO, Callable
|
||||
|
||||
import numpy as np
|
||||
|
||||
from utils.logging import get_logger
|
||||
from utils.process import register_process, safe_terminate, unregister_process
|
||||
from utils.constants import (
|
||||
import numpy as np
|
||||
|
||||
from utils.dependencies import get_tool_path
|
||||
from utils.logging import get_logger
|
||||
from utils.process import register_process, safe_terminate, unregister_process
|
||||
from utils.constants import (
|
||||
SUBGHZ_TX_ALLOWED_BANDS,
|
||||
SUBGHZ_FREQ_MIN_MHZ,
|
||||
SUBGHZ_FREQ_MAX_MHZ,
|
||||
@@ -187,19 +188,23 @@ class SubGhzManager:
|
||||
except Exception as e:
|
||||
logger.error(f"Error in SubGHz callback: {e}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Tool detection
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def check_hackrf(self) -> bool:
|
||||
if self._hackrf_available is None:
|
||||
self._hackrf_available = shutil.which('hackrf_transfer') is not None
|
||||
return self._hackrf_available
|
||||
|
||||
def check_hackrf_info(self) -> bool:
|
||||
if self._hackrf_info_available is None:
|
||||
self._hackrf_info_available = shutil.which('hackrf_info') is not None
|
||||
return self._hackrf_info_available
|
||||
# ------------------------------------------------------------------
|
||||
# Tool detection
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _resolve_tool(self, name: str) -> str | None:
|
||||
"""Resolve executable path via PATH first, then platform-aware fallbacks."""
|
||||
return shutil.which(name) or get_tool_path(name)
|
||||
|
||||
def check_hackrf(self) -> bool:
|
||||
if self._hackrf_available is None:
|
||||
self._hackrf_available = self._resolve_tool('hackrf_transfer') is not None
|
||||
return self._hackrf_available
|
||||
|
||||
def check_hackrf_info(self) -> bool:
|
||||
if self._hackrf_info_available is None:
|
||||
self._hackrf_info_available = self._resolve_tool('hackrf_info') is not None
|
||||
return self._hackrf_info_available
|
||||
|
||||
def check_hackrf_device(self) -> bool | None:
|
||||
"""Return True if a HackRF device is detected, False if not, or None if detection unavailable."""
|
||||
@@ -228,15 +233,15 @@ class SubGhzManager:
|
||||
return 'HackRF device not detected'
|
||||
return None
|
||||
|
||||
def check_rtl433(self) -> bool:
|
||||
if self._rtl433_available is None:
|
||||
self._rtl433_available = shutil.which('rtl_433') is not None
|
||||
return self._rtl433_available
|
||||
|
||||
def check_sweep(self) -> bool:
|
||||
if self._sweep_available is None:
|
||||
self._sweep_available = shutil.which('hackrf_sweep') is not None
|
||||
return self._sweep_available
|
||||
def check_rtl433(self) -> bool:
|
||||
if self._rtl433_available is None:
|
||||
self._rtl433_available = self._resolve_tool('rtl_433') is not None
|
||||
return self._rtl433_available
|
||||
|
||||
def check_sweep(self) -> bool:
|
||||
if self._sweep_available is None:
|
||||
self._sweep_available = self._resolve_tool('hackrf_sweep') is not None
|
||||
return self._sweep_available
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Status
|
||||
@@ -307,23 +312,24 @@ class SubGhzManager:
|
||||
# RECEIVE (IQ capture via hackrf_transfer -r)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start_receive(
|
||||
self,
|
||||
frequency_hz: int,
|
||||
sample_rate: int = 2000000,
|
||||
def start_receive(
|
||||
self,
|
||||
frequency_hz: int,
|
||||
sample_rate: int = 2000000,
|
||||
lna_gain: int = 32,
|
||||
vga_gain: int = 20,
|
||||
trigger_enabled: bool = False,
|
||||
trigger_pre_ms: int = 350,
|
||||
trigger_post_ms: int = 700,
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
if not self.check_hackrf():
|
||||
return {'status': 'error', 'message': 'hackrf_transfer not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
trigger_post_ms: int = 700,
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
hackrf_transfer_path = self._resolve_tool('hackrf_transfer')
|
||||
if not hackrf_transfer_path:
|
||||
return {'status': 'error', 'message': 'hackrf_transfer not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
|
||||
with self._lock:
|
||||
if self.active_mode != 'idle':
|
||||
@@ -339,11 +345,11 @@ class SubGhzManager:
|
||||
basename = f"{freq_mhz:.3f}MHz_{ts}"
|
||||
iq_file = self._captures_dir / f"{basename}.iq"
|
||||
|
||||
cmd = [
|
||||
'hackrf_transfer',
|
||||
'-r', str(iq_file),
|
||||
'-f', str(frequency_hz),
|
||||
'-s', str(sample_rate),
|
||||
cmd = [
|
||||
hackrf_transfer_path,
|
||||
'-r', str(iq_file),
|
||||
'-f', str(frequency_hz),
|
||||
'-s', str(sample_rate),
|
||||
'-l', str(lna_gain),
|
||||
'-g', str(vga_gain),
|
||||
]
|
||||
@@ -1272,23 +1278,25 @@ class SubGhzManager:
|
||||
# DECODE (hackrf_transfer piped to rtl_433)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start_decode(
|
||||
self,
|
||||
frequency_hz: int,
|
||||
sample_rate: int = 2_000_000,
|
||||
def start_decode(
|
||||
self,
|
||||
frequency_hz: int,
|
||||
sample_rate: int = 2_000_000,
|
||||
lna_gain: int = 32,
|
||||
vga_gain: int = 20,
|
||||
decode_profile: str = 'weather',
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
if not self.check_hackrf():
|
||||
return {'status': 'error', 'message': 'hackrf_transfer not found'}
|
||||
if not self.check_rtl433():
|
||||
return {'status': 'error', 'message': 'rtl_433 not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
decode_profile: str = 'weather',
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
hackrf_transfer_path = self._resolve_tool('hackrf_transfer')
|
||||
if not hackrf_transfer_path:
|
||||
return {'status': 'error', 'message': 'hackrf_transfer not found'}
|
||||
rtl433_path = self._resolve_tool('rtl_433')
|
||||
if not rtl433_path:
|
||||
return {'status': 'error', 'message': 'rtl_433 not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
|
||||
with self._lock:
|
||||
if self.active_mode != 'idle':
|
||||
@@ -1299,25 +1307,25 @@ class SubGhzManager:
|
||||
requested_sample_rate = int(sample_rate)
|
||||
stable_sample_rate = max(2_000_000, min(2_000_000, requested_sample_rate))
|
||||
|
||||
# Build hackrf_transfer command (producer: raw IQ to stdout)
|
||||
hackrf_cmd = [
|
||||
'hackrf_transfer',
|
||||
'-r', '-',
|
||||
'-f', str(frequency_hz),
|
||||
'-s', str(stable_sample_rate),
|
||||
# Build hackrf_transfer command (producer: raw IQ to stdout)
|
||||
hackrf_cmd = [
|
||||
hackrf_transfer_path,
|
||||
'-r', '-',
|
||||
'-f', str(frequency_hz),
|
||||
'-s', str(stable_sample_rate),
|
||||
'-l', str(max(SUBGHZ_LNA_GAIN_MIN, min(SUBGHZ_LNA_GAIN_MAX, lna_gain))),
|
||||
'-g', str(max(SUBGHZ_VGA_GAIN_MIN, min(SUBGHZ_VGA_GAIN_MAX, vga_gain))),
|
||||
]
|
||||
if device_serial:
|
||||
hackrf_cmd.extend(['-d', device_serial])
|
||||
|
||||
# Build rtl_433 command (consumer: reads IQ from stdin)
|
||||
# Feed signed 8-bit complex IQ directly from hackrf_transfer.
|
||||
rtl433_cmd = [
|
||||
'rtl_433',
|
||||
'-r', 'cs8:-',
|
||||
'-s', str(stable_sample_rate),
|
||||
'-f', str(frequency_hz),
|
||||
# Build rtl_433 command (consumer: reads IQ from stdin)
|
||||
# Feed signed 8-bit complex IQ directly from hackrf_transfer.
|
||||
rtl433_cmd = [
|
||||
rtl433_path,
|
||||
'-r', 'cs8:-',
|
||||
'-s', str(stable_sample_rate),
|
||||
'-f', str(frequency_hz),
|
||||
'-F', 'json',
|
||||
'-F', 'log',
|
||||
'-M', 'level',
|
||||
@@ -1936,21 +1944,22 @@ class SubGhzManager:
|
||||
except OSError as exc:
|
||||
logger.debug(f"Failed to remove TX temp file {path}: {exc}")
|
||||
|
||||
def transmit(
|
||||
self,
|
||||
capture_id: str,
|
||||
tx_gain: int = 20,
|
||||
def transmit(
|
||||
self,
|
||||
capture_id: str,
|
||||
tx_gain: int = 20,
|
||||
max_duration: int = 10,
|
||||
start_seconds: float | None = None,
|
||||
duration_seconds: float | None = None,
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
if not self.check_hackrf():
|
||||
return {'status': 'error', 'message': 'hackrf_transfer not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
duration_seconds: float | None = None,
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
hackrf_transfer_path = self._resolve_tool('hackrf_transfer')
|
||||
if not hackrf_transfer_path:
|
||||
return {'status': 'error', 'message': 'hackrf_transfer not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
|
||||
# Pre-lock: capture lookup, validation, and segment I/O (can be large)
|
||||
capture = self._load_capture(capture_id)
|
||||
@@ -2046,14 +2055,14 @@ class SubGhzManager:
|
||||
|
||||
# Clear any orphaned temp segment from a previous TX attempt.
|
||||
self._cleanup_tx_temp_file()
|
||||
if segment_path_for_cleanup:
|
||||
self._tx_temp_file = segment_path_for_cleanup
|
||||
|
||||
cmd = [
|
||||
'hackrf_transfer',
|
||||
'-t', str(tx_path),
|
||||
'-f', str(capture.frequency_hz),
|
||||
'-s', str(capture.sample_rate),
|
||||
if segment_path_for_cleanup:
|
||||
self._tx_temp_file = segment_path_for_cleanup
|
||||
|
||||
cmd = [
|
||||
hackrf_transfer_path,
|
||||
'-t', str(tx_path),
|
||||
'-f', str(capture.frequency_hz),
|
||||
'-s', str(capture.sample_rate),
|
||||
'-x', str(tx_gain),
|
||||
]
|
||||
if device_serial:
|
||||
@@ -2183,19 +2192,20 @@ class SubGhzManager:
|
||||
# SWEEP (hackrf_sweep)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start_sweep(
|
||||
self,
|
||||
freq_start_mhz: float = 300.0,
|
||||
freq_end_mhz: float = 928.0,
|
||||
bin_width: int = 100000,
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
if not self.check_sweep():
|
||||
return {'status': 'error', 'message': 'hackrf_sweep not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
def start_sweep(
|
||||
self,
|
||||
freq_start_mhz: float = 300.0,
|
||||
freq_end_mhz: float = 928.0,
|
||||
bin_width: int = 100000,
|
||||
device_serial: str | None = None,
|
||||
) -> dict:
|
||||
# Pre-lock: tool availability & device detection (blocking I/O)
|
||||
hackrf_sweep_path = self._resolve_tool('hackrf_sweep')
|
||||
if not hackrf_sweep_path:
|
||||
return {'status': 'error', 'message': 'hackrf_sweep not found'}
|
||||
device_err = self._require_hackrf_device()
|
||||
if device_err:
|
||||
return {'status': 'error', 'message': device_err}
|
||||
|
||||
# Wait for previous sweep thread to exit (blocking) before lock
|
||||
if self._sweep_thread and self._sweep_thread.is_alive():
|
||||
@@ -2204,14 +2214,14 @@ class SubGhzManager:
|
||||
return {'status': 'error', 'message': 'Previous sweep still shutting down'}
|
||||
|
||||
with self._lock:
|
||||
if self.active_mode != 'idle':
|
||||
return {'status': 'error', 'message': f'Already running: {self.active_mode}'}
|
||||
|
||||
cmd = [
|
||||
'hackrf_sweep',
|
||||
'-f', f'{int(freq_start_mhz)}:{int(freq_end_mhz)}',
|
||||
'-w', str(bin_width),
|
||||
]
|
||||
if self.active_mode != 'idle':
|
||||
return {'status': 'error', 'message': f'Already running: {self.active_mode}'}
|
||||
|
||||
cmd = [
|
||||
hackrf_sweep_path,
|
||||
'-f', f'{int(freq_start_mhz)}:{int(freq_end_mhz)}',
|
||||
'-w', str(bin_width),
|
||||
]
|
||||
if device_serial:
|
||||
cmd.extend(['-d', device_serial])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user