mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Merge remote-tracking branch 'upstream/main'
This commit is contained in:
@@ -817,9 +817,8 @@ def start_adsb():
|
||||
bias_t=bias_t
|
||||
)
|
||||
|
||||
# For RTL-SDR, ensure we use the found dump1090 path
|
||||
if sdr_type == SDRType.RTL_SDR:
|
||||
cmd[0] = dump1090_path
|
||||
# Ensure we use the resolved binary path for all SDR types
|
||||
cmd[0] = dump1090_path
|
||||
|
||||
try:
|
||||
logger.info(f"Starting dump1090 with device index {device}: {' '.join(cmd)}")
|
||||
@@ -860,12 +859,22 @@ def start_adsb():
|
||||
error_type = 'START_FAILED'
|
||||
stderr_lower = stderr_output.lower()
|
||||
|
||||
sdr_label = sdr_type.value
|
||||
|
||||
if 'usb_claim_interface' in stderr_lower or 'libusb_error_busy' in stderr_lower or 'device or resource busy' in stderr_lower:
|
||||
error_msg = 'SDR device is busy. Another process may be using it.'
|
||||
suggestion = 'Try: 1) Stop other SDR applications, 2) Run "pkill -f rtl_" to kill stale processes, or 3) Remove and reinsert the SDR device.'
|
||||
error_type = 'DEVICE_BUSY'
|
||||
elif 'no hackrf boards found' in stderr_lower or 'hackrf_open' in stderr_lower:
|
||||
error_msg = f'{sdr_label} device not found.'
|
||||
suggestion = 'Ensure the HackRF is connected. Try removing and reinserting the device.'
|
||||
error_type = 'DEVICE_NOT_FOUND'
|
||||
elif 'soapysdr not found' in stderr_lower or 'soapy' in stderr_lower and 'not found' in stderr_lower:
|
||||
error_msg = f'SoapySDR driver not found for {sdr_label}.'
|
||||
suggestion = f'Install SoapySDR and the {sdr_label} module (e.g., soapysdr-module-hackrf).'
|
||||
error_type = 'DRIVER_NOT_FOUND'
|
||||
elif 'no supported devices' in stderr_lower or 'no rtl-sdr' in stderr_lower or 'failed to open' in stderr_lower:
|
||||
error_msg = 'RTL-SDR device not found.'
|
||||
error_msg = f'{sdr_label} device not found.'
|
||||
suggestion = 'Ensure the device is connected. Try removing and reinserting the SDR.'
|
||||
error_type = 'DEVICE_NOT_FOUND'
|
||||
elif 'kernel driver is active' in stderr_lower or 'dvb' in stderr_lower:
|
||||
@@ -873,14 +882,14 @@ def start_adsb():
|
||||
suggestion = 'Blacklist the DVB drivers: Go to Settings > Hardware > "Blacklist DVB Drivers" or run "sudo rmmod dvb_usb_rtl28xxu".'
|
||||
error_type = 'KERNEL_DRIVER'
|
||||
elif 'permission' in stderr_lower or 'access' in stderr_lower:
|
||||
error_msg = 'Permission denied accessing RTL-SDR device.'
|
||||
suggestion = 'Run Intercept with sudo, or add udev rules for RTL-SDR devices.'
|
||||
error_msg = f'Permission denied accessing {sdr_label} device.'
|
||||
suggestion = f'Run Intercept with sudo, or add udev rules for {sdr_label} devices.'
|
||||
error_type = 'PERMISSION_DENIED'
|
||||
elif sdr_type == SDRType.RTL_SDR:
|
||||
error_msg = 'dump1090 failed to start.'
|
||||
suggestion = 'Try removing and reinserting the SDR device, or check if another application is using it.'
|
||||
else:
|
||||
error_msg = f'ADS-B decoder failed to start for {sdr_type.value}.'
|
||||
error_msg = f'ADS-B decoder failed to start for {sdr_label}.'
|
||||
suggestion = 'Ensure readsb is installed with SoapySDR support and the device is connected.'
|
||||
|
||||
full_msg = f'{error_msg} {suggestion}'
|
||||
|
||||
@@ -1890,19 +1890,179 @@ def _parse_rtl_power_line(line: str) -> tuple[str | None, float | None, float |
|
||||
return timestamp, None, None, []
|
||||
|
||||
|
||||
def _queue_waterfall_error(message: str) -> None:
|
||||
"""Push an error message onto the waterfall SSE queue."""
|
||||
try:
|
||||
waterfall_queue.put_nowait({
|
||||
'type': 'waterfall_error',
|
||||
'message': message,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
})
|
||||
except queue.Full:
|
||||
pass
|
||||
|
||||
|
||||
def _waterfall_loop():
|
||||
"""Continuous rtl_power sweep loop emitting waterfall data."""
|
||||
"""Continuous waterfall sweep loop emitting FFT data."""
|
||||
global waterfall_running, waterfall_process
|
||||
|
||||
def _queue_waterfall_error(message: str) -> None:
|
||||
try:
|
||||
waterfall_queue.put_nowait({
|
||||
'type': 'waterfall_error',
|
||||
'message': message,
|
||||
sdr_type_str = waterfall_config.get('sdr_type', 'rtlsdr')
|
||||
try:
|
||||
sdr_type = SDRType(sdr_type_str)
|
||||
except ValueError:
|
||||
sdr_type = SDRType.RTL_SDR
|
||||
|
||||
if sdr_type == SDRType.RTL_SDR:
|
||||
_waterfall_loop_rtl_power()
|
||||
else:
|
||||
_waterfall_loop_iq(sdr_type)
|
||||
|
||||
|
||||
def _waterfall_loop_iq(sdr_type: SDRType):
|
||||
"""Waterfall loop using rx_sdr IQ capture + FFT for HackRF/SoapySDR devices."""
|
||||
global waterfall_running, waterfall_process
|
||||
|
||||
start_freq = waterfall_config['start_freq']
|
||||
end_freq = waterfall_config['end_freq']
|
||||
gain = waterfall_config['gain']
|
||||
device = waterfall_config['device']
|
||||
interval = float(waterfall_config.get('interval', 0.4))
|
||||
|
||||
# Use center frequency and sample rate to cover the requested span
|
||||
center_mhz = (start_freq + end_freq) / 2.0
|
||||
span_hz = (end_freq - start_freq) * 1e6
|
||||
# Pick a sample rate that covers the span (minimum 2 MHz for HackRF)
|
||||
sample_rate = max(2000000, int(span_hz))
|
||||
# Cap to sensible maximum
|
||||
sample_rate = min(sample_rate, 20000000)
|
||||
|
||||
sdr_device = SDRFactory.create_default_device(sdr_type, index=device)
|
||||
builder = SDRFactory.get_builder(sdr_type)
|
||||
|
||||
cmd = builder.build_iq_capture_command(
|
||||
device=sdr_device,
|
||||
frequency_mhz=center_mhz,
|
||||
sample_rate=sample_rate,
|
||||
gain=float(gain),
|
||||
)
|
||||
|
||||
fft_size = min(int(waterfall_config.get('max_bins') or 1024), 4096)
|
||||
|
||||
try:
|
||||
waterfall_process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
# Detect immediate startup failures
|
||||
time.sleep(0.35)
|
||||
if waterfall_process.poll() is not None:
|
||||
stderr_text = ''
|
||||
try:
|
||||
if waterfall_process.stderr:
|
||||
stderr_text = waterfall_process.stderr.read().decode('utf-8', errors='ignore').strip()
|
||||
except Exception:
|
||||
stderr_text = ''
|
||||
msg = stderr_text or f'IQ capture exited early (code {waterfall_process.returncode})'
|
||||
logger.error(f"Waterfall startup failed: {msg}")
|
||||
_queue_waterfall_error(msg)
|
||||
return
|
||||
|
||||
if not waterfall_process.stdout:
|
||||
_queue_waterfall_error('IQ capture stdout unavailable')
|
||||
return
|
||||
|
||||
# Read IQ samples and compute FFT
|
||||
# CU8 format: interleaved unsigned 8-bit I/Q pairs
|
||||
bytes_per_sample = 2 # 1 byte I + 1 byte Q
|
||||
chunk_bytes = fft_size * bytes_per_sample
|
||||
received_any = False
|
||||
|
||||
while waterfall_running:
|
||||
raw = waterfall_process.stdout.read(chunk_bytes)
|
||||
if not raw or len(raw) < chunk_bytes:
|
||||
if waterfall_process.poll() is not None:
|
||||
break
|
||||
continue
|
||||
|
||||
received_any = True
|
||||
|
||||
# Convert CU8 to complex float: center at 127.5
|
||||
iq = struct.unpack(f'{fft_size * 2}B', raw)
|
||||
# Compute power spectrum via FFT
|
||||
real_parts = [(iq[i * 2] - 127.5) / 127.5 for i in range(fft_size)]
|
||||
imag_parts = [(iq[i * 2 + 1] - 127.5) / 127.5 for i in range(fft_size)]
|
||||
|
||||
bins: list[float] = []
|
||||
try:
|
||||
# Try numpy if available for efficient FFT
|
||||
import numpy as np
|
||||
samples = np.array(real_parts, dtype=np.float32) + 1j * np.array(imag_parts, dtype=np.float32)
|
||||
# Apply Hann window
|
||||
window = np.hanning(fft_size)
|
||||
samples *= window
|
||||
spectrum = np.fft.fftshift(np.fft.fft(samples))
|
||||
power_db = 10.0 * np.log10(np.abs(spectrum) ** 2 + 1e-10)
|
||||
bins = power_db.tolist()
|
||||
except ImportError:
|
||||
# Fallback: compute magnitude without full FFT
|
||||
# Just report raw magnitudes per sample as approximate power
|
||||
for i in range(fft_size):
|
||||
mag = math.sqrt(real_parts[i] ** 2 + imag_parts[i] ** 2)
|
||||
power = 10.0 * math.log10(mag ** 2 + 1e-10)
|
||||
bins.append(power)
|
||||
|
||||
max_bins = int(waterfall_config.get('max_bins') or 0)
|
||||
if max_bins > 0 and len(bins) > max_bins:
|
||||
bins = _downsample_bins(bins, max_bins)
|
||||
|
||||
msg = {
|
||||
'type': 'waterfall_sweep',
|
||||
'start_freq': start_freq,
|
||||
'end_freq': end_freq,
|
||||
'bins': bins,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
})
|
||||
except queue.Full:
|
||||
pass
|
||||
}
|
||||
try:
|
||||
waterfall_queue.put_nowait(msg)
|
||||
except queue.Full:
|
||||
try:
|
||||
waterfall_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
try:
|
||||
waterfall_queue.put_nowait(msg)
|
||||
except queue.Full:
|
||||
pass
|
||||
|
||||
# Throttle to respect interval
|
||||
time.sleep(interval)
|
||||
|
||||
if waterfall_running and not received_any:
|
||||
_queue_waterfall_error(f'No IQ data received from {sdr_type.value}')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Waterfall IQ loop error: {e}")
|
||||
_queue_waterfall_error(f"Waterfall loop error: {e}")
|
||||
finally:
|
||||
waterfall_running = False
|
||||
if waterfall_process and waterfall_process.poll() is None:
|
||||
try:
|
||||
waterfall_process.terminate()
|
||||
waterfall_process.wait(timeout=1)
|
||||
except Exception:
|
||||
try:
|
||||
waterfall_process.kill()
|
||||
except Exception:
|
||||
pass
|
||||
waterfall_process = None
|
||||
logger.info("Waterfall IQ loop stopped")
|
||||
|
||||
|
||||
def _waterfall_loop_rtl_power():
|
||||
"""Continuous rtl_power sweep loop emitting waterfall data."""
|
||||
global waterfall_running, waterfall_process
|
||||
|
||||
rtl_power_path = find_rtl_power()
|
||||
if not rtl_power_path:
|
||||
@@ -2081,17 +2241,28 @@ def start_waterfall() -> Response:
|
||||
'config': waterfall_config,
|
||||
})
|
||||
|
||||
if not find_rtl_power():
|
||||
return jsonify({'status': 'error', 'message': 'rtl_power not found'}), 503
|
||||
|
||||
data = request.json or {}
|
||||
|
||||
# Determine SDR type
|
||||
sdr_type_str = data.get('sdr_type', 'rtlsdr')
|
||||
try:
|
||||
sdr_type = SDRType(sdr_type_str)
|
||||
except ValueError:
|
||||
sdr_type = SDRType.RTL_SDR
|
||||
sdr_type_str = sdr_type.value
|
||||
|
||||
# RTL-SDR uses rtl_power; other types use rx_sdr via IQ capture
|
||||
if sdr_type == SDRType.RTL_SDR:
|
||||
if not find_rtl_power():
|
||||
return jsonify({'status': 'error', 'message': 'rtl_power not found'}), 503
|
||||
|
||||
try:
|
||||
waterfall_config['start_freq'] = float(data.get('start_freq', 88.0))
|
||||
waterfall_config['end_freq'] = float(data.get('end_freq', 108.0))
|
||||
waterfall_config['bin_size'] = int(data.get('bin_size', 10000))
|
||||
waterfall_config['gain'] = int(data.get('gain', 40))
|
||||
waterfall_config['device'] = int(data.get('device', 0))
|
||||
waterfall_config['sdr_type'] = sdr_type_str
|
||||
if data.get('interval') is not None:
|
||||
interval = float(data.get('interval', waterfall_config['interval']))
|
||||
if interval < 0.1 or interval > 5:
|
||||
@@ -2116,12 +2287,12 @@ def start_waterfall() -> Response:
|
||||
pass
|
||||
|
||||
# Claim SDR device
|
||||
error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall', 'rtlsdr')
|
||||
error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall', sdr_type_str)
|
||||
if error:
|
||||
return jsonify({'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error}), 409
|
||||
|
||||
waterfall_active_device = waterfall_config['device']
|
||||
waterfall_active_sdr_type = 'rtlsdr'
|
||||
waterfall_active_sdr_type = sdr_type_str
|
||||
waterfall_running = True
|
||||
waterfall_thread = threading.Thread(target=_waterfall_loop, daemon=True)
|
||||
waterfall_thread.start()
|
||||
|
||||
88
tests/test_hackrf_commands.py
Normal file
88
tests/test_hackrf_commands.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""Tests for HackRF command builder."""
|
||||
|
||||
from utils.sdr.base import SDRDevice, SDRType
|
||||
from utils.sdr.hackrf import HackRFCommandBuilder
|
||||
|
||||
|
||||
def _make_device(serial: str = 'abc123') -> SDRDevice:
|
||||
return SDRDevice(
|
||||
sdr_type=SDRType.HACKRF,
|
||||
index=0,
|
||||
name='HackRF One',
|
||||
serial=serial,
|
||||
driver='hackrf',
|
||||
capabilities=HackRFCommandBuilder.CAPABILITIES,
|
||||
)
|
||||
|
||||
|
||||
class TestHackRFCapabilities:
|
||||
def test_gain_max_reflects_combined_lna_vga(self):
|
||||
"""gain_max should be LNA(40) + VGA(62) = 102."""
|
||||
assert HackRFCommandBuilder.CAPABILITIES.gain_max == 102.0
|
||||
|
||||
def test_frequency_range(self):
|
||||
caps = HackRFCommandBuilder.CAPABILITIES
|
||||
assert caps.freq_min_mhz == 1.0
|
||||
assert caps.freq_max_mhz == 6000.0
|
||||
|
||||
def test_tx_capable(self):
|
||||
assert HackRFCommandBuilder.CAPABILITIES.tx_capable is True
|
||||
|
||||
|
||||
class TestSplitGain:
|
||||
def test_low_gain_all_to_lna(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
lna, vga = builder._split_gain(30)
|
||||
assert lna == 30
|
||||
assert vga == 0
|
||||
|
||||
def test_gain_at_lna_max(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
lna, vga = builder._split_gain(40)
|
||||
assert lna == 40
|
||||
assert vga == 0
|
||||
|
||||
def test_high_gain_splits_across_stages(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
lna, vga = builder._split_gain(80)
|
||||
assert lna == 40
|
||||
assert vga == 40
|
||||
|
||||
def test_max_combined_gain(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
lna, vga = builder._split_gain(102)
|
||||
assert lna == 40
|
||||
assert vga == 62
|
||||
|
||||
|
||||
class TestBuildAdsbCommand:
|
||||
def test_contains_soapysdr_device_type(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
cmd = builder.build_adsb_command(_make_device(), gain=40)
|
||||
assert '--device-type' in cmd
|
||||
assert 'soapysdr' in cmd
|
||||
|
||||
def test_includes_serial_in_device_string(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
cmd = builder.build_adsb_command(_make_device(serial='deadbeef'), gain=40)
|
||||
device_idx = cmd.index('--device')
|
||||
assert 'deadbeef' in cmd[device_idx + 1]
|
||||
|
||||
|
||||
class TestBuildIQCaptureCommand:
|
||||
def test_outputs_cu8_to_stdout(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
cmd = builder.build_iq_capture_command(
|
||||
_make_device(), frequency_mhz=100.0, sample_rate=2048000, gain=40
|
||||
)
|
||||
assert '-F' in cmd
|
||||
assert 'CU8' in cmd
|
||||
assert cmd[-1] == '-'
|
||||
|
||||
def test_gain_split_in_command(self):
|
||||
builder = HackRFCommandBuilder()
|
||||
cmd = builder.build_iq_capture_command(
|
||||
_make_device(), frequency_mhz=100.0, gain=80
|
||||
)
|
||||
gain_idx = cmd.index('-g')
|
||||
assert cmd[gain_idx + 1] == 'LNA=40,VGA=40'
|
||||
@@ -1,9 +1,20 @@
|
||||
"""Tests for RTL-SDR detection parsing."""
|
||||
"""Tests for SDR detection parsing (RTL-SDR and HackRF)."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import utils.sdr.detection as detection_mod
|
||||
from utils.sdr.base import SDRType
|
||||
from utils.sdr.detection import detect_rtlsdr_devices
|
||||
from utils.sdr.detection import detect_hackrf_devices, detect_rtlsdr_devices
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_detection_caches():
|
||||
"""Reset detection caches before each test."""
|
||||
detection_mod._hackrf_cache = []
|
||||
detection_mod._hackrf_cache_ts = 0.0
|
||||
yield
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@@ -44,3 +55,87 @@ def test_detect_rtlsdr_devices_uses_replace_decode_mode(mock_run, _mock_check_to
|
||||
assert kwargs["text"] is True
|
||||
assert kwargs["encoding"] == "utf-8"
|
||||
assert kwargs["errors"] == "replace"
|
||||
|
||||
|
||||
# ---- HackRF detection tests ----
|
||||
|
||||
HACKRF_INFO_OUTPUT = (
|
||||
"hackrf_info version: 2024.02.1\n"
|
||||
"libhackrf version: 2024.02.1 (0.9)\n"
|
||||
"Found HackRF\n"
|
||||
"Index: 0\n"
|
||||
"Serial number: 0000000000000000a06063c8234e925f\n"
|
||||
"Board ID Number: 2 (HackRF One)\n"
|
||||
"Firmware Version: 2024.02.1 (API:1.08)\n"
|
||||
"Part ID Number: 0xa000cb3c 0x00614764\n"
|
||||
"Hardware Revision: r9\n"
|
||||
"Hardware supported by installed firmware:\n"
|
||||
" HackRF One\n"
|
||||
)
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_from_stdout(mock_run, _mock_check_tool):
|
||||
"""Parse HackRF device info from stdout."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = HACKRF_INFO_OUTPUT
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
devices = detect_hackrf_devices()
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].sdr_type == SDRType.HACKRF
|
||||
assert devices[0].name == "HackRF One"
|
||||
assert devices[0].serial == "0000000000000000a06063c8234e925f"
|
||||
assert devices[0].index == 0
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_from_stderr(mock_run, _mock_check_tool):
|
||||
"""Parse HackRF device info when output goes to stderr (newer firmware)."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ""
|
||||
mock_result.stderr = HACKRF_INFO_OUTPUT
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
devices = detect_hackrf_devices()
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].sdr_type == SDRType.HACKRF
|
||||
assert devices[0].name == "HackRF One"
|
||||
assert devices[0].serial == "0000000000000000a06063c8234e925f"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_nonzero_exit_with_valid_output(mock_run, _mock_check_tool):
|
||||
"""Parse HackRF info even when hackrf_info exits non-zero (device busy)."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.returncode = 1
|
||||
mock_result.stdout = ""
|
||||
mock_result.stderr = HACKRF_INFO_OUTPUT
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
devices = detect_hackrf_devices()
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].name == "HackRF One"
|
||||
|
||||
|
||||
@patch('utils.sdr.detection._check_tool', return_value=True)
|
||||
@patch('utils.sdr.detection.subprocess.run')
|
||||
def test_detect_hackrf_fallback_no_serial(mock_run, _mock_check_tool):
|
||||
"""Fallback detection when serial is missing but 'Found HackRF' present."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = "Found HackRF\nBoard ID Number: 2 (HackRF One)\n"
|
||||
mock_result.stderr = ""
|
||||
mock_run.return_value = mock_result
|
||||
|
||||
devices = detect_hackrf_devices()
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].name == "HackRF One"
|
||||
assert devices[0].serial == "Unknown"
|
||||
|
||||
@@ -356,6 +356,11 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Combine stdout + stderr: newer firmware may print to stderr,
|
||||
# and hackrf_info may exit non-zero when device is briefly busy
|
||||
# but still output valid info.
|
||||
output = result.stdout + result.stderr
|
||||
|
||||
# Parse hackrf_info output
|
||||
# Extract board name from "Board ID Number: X (Name)" and serial
|
||||
from .hackrf import HackRFCommandBuilder
|
||||
@@ -363,8 +368,8 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
serial_pattern = r'Serial number:\s*(\S+)'
|
||||
board_pattern = r'Board ID Number:\s*\d+\s*\(([^)]+)\)'
|
||||
|
||||
serials_found = re.findall(serial_pattern, result.stdout)
|
||||
boards_found = re.findall(board_pattern, result.stdout)
|
||||
serials_found = re.findall(serial_pattern, output)
|
||||
boards_found = re.findall(board_pattern, output)
|
||||
|
||||
for i, serial in enumerate(serials_found):
|
||||
board_name = boards_found[i] if i < len(boards_found) else 'HackRF'
|
||||
@@ -378,8 +383,8 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
))
|
||||
|
||||
# Fallback: check if any HackRF found without serial
|
||||
if not devices and 'Found HackRF' in result.stdout:
|
||||
board_match = re.search(board_pattern, result.stdout)
|
||||
if not devices and 'Found HackRF' in output:
|
||||
board_match = re.search(board_pattern, output)
|
||||
board_name = board_match.group(1) if board_match else 'HackRF'
|
||||
devices.append(SDRDevice(
|
||||
sdr_type=SDRType.HACKRF,
|
||||
|
||||
@@ -22,7 +22,7 @@ class HackRFCommandBuilder(CommandBuilder):
|
||||
freq_min_mhz=1.0, # 1 MHz
|
||||
freq_max_mhz=6000.0, # 6 GHz
|
||||
gain_min=0.0,
|
||||
gain_max=62.0, # LNA (0-40) + VGA (0-62)
|
||||
gain_max=102.0, # LNA (0-40) + VGA (0-62)
|
||||
sample_rates=[2000000, 4000000, 8000000, 10000000, 20000000],
|
||||
supports_bias_t=True,
|
||||
supports_ppm=False,
|
||||
|
||||
Reference in New Issue
Block a user