mirror of
https://github.com/smittix/intercept.git
synced 2026-07-02 06:48:58 -07:00
Update SDR device claims
This commit is contained in:
@@ -235,62 +235,77 @@ cleanup_manager.register(deauth_alerts)
|
||||
# ============================================
|
||||
# SDR DEVICE REGISTRY
|
||||
# ============================================
|
||||
# Tracks which mode is using which SDR device to prevent conflicts
|
||||
# Key: device_index (int), Value: mode_name (str)
|
||||
sdr_device_registry: dict[int, str] = {}
|
||||
sdr_device_registry_lock = threading.Lock()
|
||||
|
||||
|
||||
def claim_sdr_device(device_index: int, mode_name: str) -> str | None:
|
||||
"""Claim an SDR device for a mode.
|
||||
# Tracks which mode is using which SDR device to prevent conflicts
|
||||
# Key: device_index (int), Value: {sdr_type: mode_name}
|
||||
sdr_device_registry: dict[int, dict[str, str]] = {}
|
||||
sdr_device_registry_lock = threading.Lock()
|
||||
|
||||
|
||||
def claim_sdr_device(device_index: int, mode_name: str, sdr_type: str = 'rtlsdr') -> str | None:
|
||||
"""Claim an SDR device for a mode.
|
||||
|
||||
Checks the in-app registry first, then probes the USB device to
|
||||
catch stale handles held by external processes (e.g. a leftover
|
||||
rtl_fm from a previous crash).
|
||||
|
||||
Args:
|
||||
device_index: The SDR device index to claim
|
||||
mode_name: Name of the mode claiming the device (e.g., 'sensor', 'rtlamr')
|
||||
device_index: The SDR device index to claim
|
||||
mode_name: Name of the mode claiming the device (e.g., 'sensor', 'rtlamr')
|
||||
sdr_type: SDR hardware type (e.g., 'rtlsdr', 'hackrf')
|
||||
|
||||
Returns:
|
||||
Error message if device is in use, None if successfully claimed
|
||||
"""
|
||||
with sdr_device_registry_lock:
|
||||
if device_index in sdr_device_registry:
|
||||
in_use_by = sdr_device_registry[device_index]
|
||||
return f'SDR device {device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.'
|
||||
|
||||
# Probe the USB device to catch external processes holding the handle
|
||||
try:
|
||||
from utils.sdr.detection import probe_rtlsdr_device
|
||||
usb_error = probe_rtlsdr_device(device_index)
|
||||
if usb_error:
|
||||
return usb_error
|
||||
except Exception:
|
||||
pass # If probe fails, let the caller proceed normally
|
||||
|
||||
sdr_device_registry[device_index] = mode_name
|
||||
return None
|
||||
|
||||
|
||||
def release_sdr_device(device_index: int) -> None:
|
||||
"""Release an SDR device from the registry.
|
||||
sdr_type_key = str(sdr_type or 'rtlsdr').lower()
|
||||
|
||||
with sdr_device_registry_lock:
|
||||
device_entry = sdr_device_registry.get(device_index, {})
|
||||
if sdr_type_key in device_entry:
|
||||
in_use_by = device_entry[sdr_type_key]
|
||||
return f'SDR device {device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.'
|
||||
|
||||
# Probe the USB device to catch external processes holding the handle
|
||||
# Only relevant for RTL-SDR devices
|
||||
if sdr_type_key == 'rtlsdr':
|
||||
try:
|
||||
from utils.sdr.detection import probe_rtlsdr_device
|
||||
usb_error = probe_rtlsdr_device(device_index)
|
||||
if usb_error:
|
||||
return usb_error
|
||||
except Exception:
|
||||
pass # If probe fails, let the caller proceed normally
|
||||
|
||||
if device_index not in sdr_device_registry:
|
||||
sdr_device_registry[device_index] = {}
|
||||
sdr_device_registry[device_index][sdr_type_key] = mode_name
|
||||
return None
|
||||
|
||||
|
||||
def release_sdr_device(device_index: int, sdr_type: str = 'rtlsdr') -> None:
|
||||
"""Release an SDR device from the registry.
|
||||
|
||||
Args:
|
||||
device_index: The SDR device index to release
|
||||
"""
|
||||
with sdr_device_registry_lock:
|
||||
sdr_device_registry.pop(device_index, None)
|
||||
device_index: The SDR device index to release
|
||||
sdr_type: SDR hardware type (e.g., 'rtlsdr', 'hackrf')
|
||||
"""
|
||||
sdr_type_key = str(sdr_type or 'rtlsdr').lower()
|
||||
with sdr_device_registry_lock:
|
||||
entry = sdr_device_registry.get(device_index)
|
||||
if not entry:
|
||||
return
|
||||
entry.pop(sdr_type_key, None)
|
||||
if not entry:
|
||||
sdr_device_registry.pop(device_index, None)
|
||||
|
||||
|
||||
def get_sdr_device_status() -> dict[int, str]:
|
||||
"""Get current SDR device allocations.
|
||||
def get_sdr_device_status() -> dict[int, dict[str, str]]:
|
||||
"""Get current SDR device allocations.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping device indices to mode names
|
||||
"""
|
||||
with sdr_device_registry_lock:
|
||||
return dict(sdr_device_registry)
|
||||
Dictionary mapping device indices to {sdr_type: mode_name}
|
||||
"""
|
||||
with sdr_device_registry_lock:
|
||||
return {idx: dict(modes) for idx, modes in sdr_device_registry.items()}
|
||||
|
||||
|
||||
# ============================================
|
||||
@@ -388,17 +403,20 @@ def get_devices() -> Response:
|
||||
|
||||
|
||||
@app.route('/devices/status')
|
||||
def get_devices_status() -> Response:
|
||||
"""Get all SDR devices with usage status."""
|
||||
devices = SDRFactory.detect_devices()
|
||||
registry = get_sdr_device_status()
|
||||
|
||||
result = []
|
||||
for device in devices:
|
||||
d = device.to_dict()
|
||||
d['in_use'] = device.index in registry
|
||||
d['used_by'] = registry.get(device.index)
|
||||
result.append(d)
|
||||
def get_devices_status() -> Response:
|
||||
"""Get all SDR devices with usage status."""
|
||||
devices = SDRFactory.detect_devices()
|
||||
registry = get_sdr_device_status()
|
||||
|
||||
result = []
|
||||
for device in devices:
|
||||
d = device.to_dict()
|
||||
sdr_type_key = device.sdr_type.value if hasattr(device.sdr_type, 'value') else str(device.sdr_type)
|
||||
sdr_type_key = str(sdr_type_key).lower()
|
||||
device_registry = registry.get(device.index, {})
|
||||
d['in_use'] = sdr_type_key in device_registry
|
||||
d['used_by'] = device_registry.get(sdr_type_key)
|
||||
result.append(d)
|
||||
|
||||
return jsonify(result)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user