mirror of
https://github.com/smittix/intercept.git
synced 2026-04-30 17:49:58 -07:00
fix(modes): deep-linked mode scripts fail when body not yet parsed
ensureModeScript() used document.body.appendChild() to load lazy mode scripts, but the preload for ?mode= query params runs in <head> before <body> exists, causing all deep-linked modes to silently fail. Also fix cross-mode handoffs (BT→BT Locate, WiFi→WiFi Locate, Spy Stations→Waterfall) that assumed target module was already loaded. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,15 +6,15 @@ Detects RTL-SDR devices via rtl_test and other SDR hardware via SoapySDR.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from utils.dependencies import get_tool_path
|
||||
|
||||
from .base import SDRCapabilities, SDRDevice, SDRType
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from utils.dependencies import get_tool_path
|
||||
|
||||
from .base import SDRCapabilities, SDRDevice, SDRType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -44,7 +44,7 @@ def _hackrf_probe_blocked() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _get_capabilities_for_type(sdr_type: SDRType) -> SDRCapabilities:
|
||||
def _get_capabilities_for_type(sdr_type: SDRType) -> SDRCapabilities:
|
||||
"""Get default capabilities for an SDR type."""
|
||||
# Import here to avoid circular imports
|
||||
from .rtlsdr import RTLSDRCommandBuilder
|
||||
@@ -96,7 +96,7 @@ def _driver_to_sdr_type(driver: str) -> Optional[SDRType]:
|
||||
return mapping.get(driver.lower())
|
||||
|
||||
|
||||
def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
"""
|
||||
Detect RTL-SDR devices using rtl_test.
|
||||
|
||||
@@ -105,10 +105,10 @@ def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
"""
|
||||
devices: list[SDRDevice] = []
|
||||
|
||||
rtl_test_path = get_tool_path('rtl_test')
|
||||
if not rtl_test_path:
|
||||
logger.debug("rtl_test not found, skipping RTL-SDR detection")
|
||||
return devices
|
||||
rtl_test_path = get_tool_path('rtl_test')
|
||||
if not rtl_test_path:
|
||||
logger.debug("rtl_test not found, skipping RTL-SDR detection")
|
||||
return devices
|
||||
|
||||
try:
|
||||
import os
|
||||
@@ -119,15 +119,19 @@ def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
lib_paths = ['/usr/local/lib', '/opt/homebrew/lib']
|
||||
current_ld = env.get('DYLD_LIBRARY_PATH', '')
|
||||
env['DYLD_LIBRARY_PATH'] = ':'.join(lib_paths + [current_ld] if current_ld else lib_paths)
|
||||
result = subprocess.run(
|
||||
[rtl_test_path, '-t'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
errors='replace',
|
||||
timeout=5,
|
||||
env=env
|
||||
)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[rtl_test_path, '-t'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
errors='replace',
|
||||
timeout=5,
|
||||
env=env
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("rtl_test timed out after 5s")
|
||||
return []
|
||||
output = result.stderr + result.stdout
|
||||
|
||||
# Parse device info from rtl_test output
|
||||
@@ -173,14 +177,14 @@ def detect_rtlsdr_devices() -> list[SDRDevice]:
|
||||
return devices
|
||||
|
||||
|
||||
def _find_soapy_util() -> str | None:
|
||||
"""Find SoapySDR utility command (name varies by distribution)."""
|
||||
# Try different command names used across distributions
|
||||
for cmd in ['SoapySDRUtil', 'soapy_sdr_util', 'soapysdr-util']:
|
||||
tool_path = get_tool_path(cmd)
|
||||
if tool_path:
|
||||
return tool_path
|
||||
return None
|
||||
def _find_soapy_util() -> str | None:
|
||||
"""Find SoapySDR utility command (name varies by distribution)."""
|
||||
# Try different command names used across distributions
|
||||
for cmd in ['SoapySDRUtil', 'soapy_sdr_util', 'soapysdr-util']:
|
||||
tool_path = get_tool_path(cmd)
|
||||
if tool_path:
|
||||
return tool_path
|
||||
return None
|
||||
|
||||
|
||||
def _get_soapy_env() -> dict:
|
||||
@@ -322,7 +326,7 @@ def _add_soapy_device(
|
||||
))
|
||||
|
||||
|
||||
def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
"""
|
||||
Detect HackRF devices using native hackrf_info tool.
|
||||
|
||||
@@ -341,46 +345,46 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
|
||||
devices: list[SDRDevice] = []
|
||||
|
||||
hackrf_info_path = get_tool_path('hackrf_info')
|
||||
if not hackrf_info_path:
|
||||
_hackrf_cache = devices
|
||||
_hackrf_cache_ts = now
|
||||
return devices
|
||||
hackrf_info_path = get_tool_path('hackrf_info')
|
||||
if not hackrf_info_path:
|
||||
_hackrf_cache = devices
|
||||
_hackrf_cache_ts = now
|
||||
return devices
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[hackrf_info_path],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
result = subprocess.run(
|
||||
[hackrf_info_path],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Combine stdout + stderr: newer firmware may print to stderr,
|
||||
# and hackrf_info may exit non-zero when device is briefly busy
|
||||
# but still output valid info.
|
||||
output = f"{result.stdout or ''}\n{result.stderr or ''}"
|
||||
|
||||
# Parse hackrf_info output
|
||||
# Extract board name from "Board ID Number: X (Name)" and serial
|
||||
from .hackrf import HackRFCommandBuilder
|
||||
|
||||
serial_pattern = re.compile(
|
||||
r'^\s*Serial\s+number:\s*(.+)$',
|
||||
re.IGNORECASE | re.MULTILINE,
|
||||
)
|
||||
board_pattern = re.compile(
|
||||
r'Board\s+ID\s+Number:\s*\d+\s*\(([^)]+)\)',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
serials_found = []
|
||||
for raw in serial_pattern.findall(output):
|
||||
# Normalise legacy formats like "0x1234 5678" to plain hex.
|
||||
serial = re.sub(r'0x', '', raw, flags=re.IGNORECASE)
|
||||
serial = re.sub(r'[^0-9A-Fa-f]', '', serial)
|
||||
if serial:
|
||||
serials_found.append(serial)
|
||||
boards_found = board_pattern.findall(output)
|
||||
output = f"{result.stdout or ''}\n{result.stderr or ''}"
|
||||
|
||||
# Parse hackrf_info output
|
||||
# Extract board name from "Board ID Number: X (Name)" and serial
|
||||
from .hackrf import HackRFCommandBuilder
|
||||
|
||||
serial_pattern = re.compile(
|
||||
r'^\s*Serial\s+number:\s*(.+)$',
|
||||
re.IGNORECASE | re.MULTILINE,
|
||||
)
|
||||
board_pattern = re.compile(
|
||||
r'Board\s+ID\s+Number:\s*\d+\s*\(([^)]+)\)',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
serials_found = []
|
||||
for raw in serial_pattern.findall(output):
|
||||
# Normalise legacy formats like "0x1234 5678" to plain hex.
|
||||
serial = re.sub(r'0x', '', raw, flags=re.IGNORECASE)
|
||||
serial = re.sub(r'[^0-9A-Fa-f]', '', serial)
|
||||
if serial:
|
||||
serials_found.append(serial)
|
||||
boards_found = board_pattern.findall(output)
|
||||
|
||||
for i, serial in enumerate(serials_found):
|
||||
board_name = boards_found[i] if i < len(boards_found) else 'HackRF'
|
||||
@@ -394,11 +398,11 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
))
|
||||
|
||||
# Fallback: check if any HackRF found without serial
|
||||
if not devices and re.search(r'Found\s+HackRF', output, re.IGNORECASE):
|
||||
board_match = board_pattern.search(output)
|
||||
board_name = board_match.group(1) if board_match else 'HackRF'
|
||||
devices.append(SDRDevice(
|
||||
sdr_type=SDRType.HACKRF,
|
||||
if not devices and re.search(r'Found\s+HackRF', output, re.IGNORECASE):
|
||||
board_match = board_pattern.search(output)
|
||||
board_name = board_match.group(1) if board_match else 'HackRF'
|
||||
devices.append(SDRDevice(
|
||||
sdr_type=SDRType.HACKRF,
|
||||
index=0,
|
||||
name=board_name,
|
||||
serial='Unknown',
|
||||
@@ -414,7 +418,7 @@ def detect_hackrf_devices() -> list[SDRDevice]:
|
||||
return devices
|
||||
|
||||
|
||||
def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
"""Probe whether an RTL-SDR device is available at the USB level.
|
||||
|
||||
Runs a quick ``rtl_test`` invocation targeting a single device to
|
||||
@@ -428,11 +432,11 @@ def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
An error message string if the device cannot be opened,
|
||||
or ``None`` if the device is available.
|
||||
"""
|
||||
rtl_test_path = get_tool_path('rtl_test')
|
||||
if not rtl_test_path:
|
||||
# Can't probe without rtl_test — let the caller proceed and
|
||||
# surface errors from the actual decoder process instead.
|
||||
return None
|
||||
rtl_test_path = get_tool_path('rtl_test')
|
||||
if not rtl_test_path:
|
||||
# Can't probe without rtl_test — let the caller proceed and
|
||||
# surface errors from the actual decoder process instead.
|
||||
return None
|
||||
|
||||
try:
|
||||
import os
|
||||
@@ -449,11 +453,11 @@ def probe_rtlsdr_device(device_index: int) -> str | None:
|
||||
# Use Popen with early termination instead of run() with full timeout.
|
||||
# rtl_test prints device info to stderr quickly, then keeps running
|
||||
# its test loop. We kill it as soon as we see success or failure.
|
||||
proc = subprocess.Popen(
|
||||
[rtl_test_path, '-d', str(device_index), '-t'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
proc = subprocess.Popen(
|
||||
[rtl_test_path, '-d', str(device_index), '-t'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
env=env,
|
||||
)
|
||||
|
||||
@@ -572,6 +576,16 @@ def detect_all_devices(force: bool = False) -> list[SDRDevice]:
|
||||
return devices
|
||||
|
||||
|
||||
def get_cached_devices() -> list[SDRDevice] | None:
|
||||
"""Return the cached device list without probing hardware.
|
||||
|
||||
Returns None if no cached data is available (never probed).
|
||||
"""
|
||||
if _all_devices_cache_ts == 0.0:
|
||||
return None
|
||||
return list(_all_devices_cache)
|
||||
|
||||
|
||||
def invalidate_device_cache() -> None:
|
||||
"""Clear the all-devices cache so the next call re-probes hardware."""
|
||||
global _all_devices_cache, _all_devices_cache_ts
|
||||
|
||||
Reference in New Issue
Block a user