mirror of
https://github.com/smittix/intercept.git
synced 2026-06-15 00:53:37 -07:00
Merge remote-tracking branch 'upstream/main'
This commit is contained in:
+33
-12
@@ -17,9 +17,9 @@ def _clear_detection_caches():
|
||||
yield
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/rtl_test')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_check_tool):
|
||||
def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_tool_path):
|
||||
"""Ignore malformed rtl_test rows that have an empty SN field."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
@@ -40,9 +40,9 @@ def test_detect_rtlsdr_devices_filters_empty_serial_entries(mock_run, _mock_chec
|
||||
assert devices[0].serial == "1"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/rtl_test')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_check_tool):
|
||||
def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_tool_path):
|
||||
"""Run rtl_test with tolerant decoding for malformed output bytes."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
@@ -74,9 +74,9 @@ HACKRF_INFO_OUTPUT = (
|
||||
)
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_from_stdout(mock_run, _mock_tool_path):
|
||||
"""Parse HackRF device info from stdout."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = HACKRF_INFO_OUTPUT
|
||||
@@ -92,9 +92,9 @@ def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool):
|
||||
assert devices[0].index == 0
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_from_stderr(mock_run, _mock_tool_path):
|
||||
"""Parse HackRF device info when output goes to stderr (newer firmware)."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
@@ -109,9 +109,9 @@ def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool):
|
||||
assert devices[0].serial == "0000000000000000a06063c8234e925f"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_tool_path):
|
||||
"""Parse HackRF info even when hackrf_info exits non-zero (device busy)."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.returncode = 1
|
||||
@@ -125,9 +125,9 @@ def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool
|
||||
assert devices[0].name == "HackRF One"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool):
|
||||
def test_detect_hackrf_fallback_no_serial(mock_run, _mock_tool_path):
|
||||
"""Fallback detection when serial is missing but 'Found HackRF' present."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = "Found HackRF\nBoard ID Number: 2 (HackRF One)\n"
|
||||
@@ -139,3 +139,24 @@ def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool):
|
||||
assert len(devices) == 1
|
||||
assert devices[0].name == "HackRF One"
|
||||
assert devices[0].serial == "Unknown"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection.get_tool_path', return_value='/usr/bin/hackrf_info')
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_parses_legacy_serial_format(mock_run, _mock_tool_path):
|
||||
"""Accept legacy 'Serial Number' casing and spaced hex format."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = (
|
||||
"Found HackRF\n"
|
||||
"Index: 0\n"
|
||||
"Serial Number: 0x00000000 00000000 a06063c8 234e925f\n"
|
||||
"Board ID Number: 3 (HackRF Pro)\n"
|
||||
)
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
devices = detect_hackrf_devices()
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].name == "HackRF Pro"
|
||||
assert devices[0].serial == "0000000000000000a06063c8234e925f"
|
||||
|
||||
+59
-50
@@ -43,15 +43,16 @@ class TestSubGhzManagerInit:
|
||||
assert status['mode'] == 'idle'
|
||||
|
||||
|
||||
class TestToolDetection:
|
||||
class TestToolDetection:
|
||||
def test_check_hackrf_found(self, manager):
|
||||
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'):
|
||||
assert manager.check_hackrf() is True
|
||||
|
||||
def test_check_hackrf_not_found(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None # reset cache
|
||||
assert manager.check_hackrf() is False
|
||||
def test_check_hackrf_not_found(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None # reset cache
|
||||
assert manager.check_hackrf() is False
|
||||
|
||||
def test_check_rtl433_found(self, manager):
|
||||
with patch('shutil.which', return_value='/usr/bin/rtl_433'):
|
||||
@@ -62,13 +63,14 @@ class TestToolDetection:
|
||||
assert manager.check_sweep() is True
|
||||
|
||||
|
||||
class TestReceive:
|
||||
def test_start_receive_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.start_receive(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'not found' in result['message']
|
||||
class TestReceive:
|
||||
def test_start_receive_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.start_receive(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'not found' in result['message']
|
||||
|
||||
def test_start_receive_success(self, manager):
|
||||
mock_proc = MagicMock()
|
||||
@@ -164,11 +166,12 @@ class TestTxSafety:
|
||||
result = SubGhzManager.validate_tx_frequency(500000000) # 500 MHz
|
||||
assert result is not None
|
||||
|
||||
def test_transmit_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.transmit(capture_id='abc123')
|
||||
assert result['status'] == 'error'
|
||||
def test_transmit_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
result = manager.transmit(capture_id='abc123')
|
||||
assert result['status'] == 'error'
|
||||
|
||||
def test_transmit_capture_not_found(self, manager):
|
||||
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
|
||||
@@ -464,12 +467,13 @@ class TestCaptureLibrary:
|
||||
assert all(c.fingerprint_group_size == 2 for c in captures)
|
||||
|
||||
|
||||
class TestSweep:
|
||||
def test_start_sweep_no_tool(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._sweep_available = None
|
||||
result = manager.start_sweep()
|
||||
assert result['status'] == 'error'
|
||||
class TestSweep:
|
||||
def test_start_sweep_no_tool(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._sweep_available = None
|
||||
result = manager.start_sweep()
|
||||
assert result['status'] == 'error'
|
||||
|
||||
def test_start_sweep_success(self, manager):
|
||||
import time as _time
|
||||
@@ -494,14 +498,15 @@ class TestSweep:
|
||||
assert result['status'] == 'not_running'
|
||||
|
||||
|
||||
class TestDecode:
|
||||
def test_start_decode_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'hackrf_transfer' in result['message']
|
||||
class TestDecode:
|
||||
def test_start_decode_no_hackrf(self, manager):
|
||||
with patch('shutil.which', return_value=None), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'hackrf_transfer' in result['message']
|
||||
|
||||
def test_start_decode_no_rtl433(self, manager):
|
||||
def which_side_effect(name):
|
||||
@@ -509,12 +514,13 @@ class TestDecode:
|
||||
return '/usr/bin/hackrf_transfer'
|
||||
return None
|
||||
|
||||
with patch('shutil.which', side_effect=which_side_effect):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'rtl_433' in result['message']
|
||||
with patch('shutil.which', side_effect=which_side_effect), \
|
||||
patch('utils.subghz.get_tool_path', return_value=None):
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
result = manager.start_decode(frequency_hz=433920000)
|
||||
assert result['status'] == 'error'
|
||||
assert 'rtl_433' in result['message']
|
||||
|
||||
def test_start_decode_success(self, manager):
|
||||
mock_hackrf_proc = MagicMock()
|
||||
@@ -537,9 +543,12 @@ class TestDecode:
|
||||
return mock_hackrf_proc
|
||||
return mock_rtl433_proc
|
||||
|
||||
with patch('shutil.which', return_value='/usr/bin/tool'), \
|
||||
patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \
|
||||
patch('utils.subghz.register_process'):
|
||||
def which_side_effect(name):
|
||||
return f'/usr/bin/{name}'
|
||||
|
||||
with patch('shutil.which', side_effect=which_side_effect), \
|
||||
patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \
|
||||
patch('utils.subghz.register_process'):
|
||||
import time as _time
|
||||
manager._hackrf_available = None
|
||||
manager._rtl433_available = None
|
||||
@@ -556,16 +565,16 @@ class TestDecode:
|
||||
# Two processes: hackrf_transfer + rtl_433
|
||||
assert mock_popen.call_count == 2
|
||||
|
||||
# Verify hackrf_transfer command
|
||||
hackrf_cmd = mock_popen.call_args_list[0][0][0]
|
||||
assert hackrf_cmd[0] == 'hackrf_transfer'
|
||||
assert '-r' in hackrf_cmd
|
||||
|
||||
# Verify rtl_433 command
|
||||
rtl433_cmd = mock_popen.call_args_list[1][0][0]
|
||||
assert rtl433_cmd[0] == 'rtl_433'
|
||||
assert '-r' in rtl433_cmd
|
||||
assert 'cs8:-' in rtl433_cmd
|
||||
# Verify hackrf_transfer command
|
||||
hackrf_cmd = mock_popen.call_args_list[0][0][0]
|
||||
assert os.path.basename(hackrf_cmd[0]) == 'hackrf_transfer'
|
||||
assert '-r' in hackrf_cmd
|
||||
|
||||
# Verify rtl_433 command
|
||||
rtl433_cmd = mock_popen.call_args_list[1][0][0]
|
||||
assert os.path.basename(rtl433_cmd[0]) == 'rtl_433'
|
||||
assert '-r' in rtl433_cmd
|
||||
assert 'cs8:-' in rtl433_cmd
|
||||
|
||||
# Both processes tracked
|
||||
assert manager._decode_hackrf_process is mock_hackrf_proc
|
||||
|
||||
Reference in New Issue
Block a user