Add RF scanning and improve TSCM device naming

- Add _scan_rf_signals() function using rtl_power to scan:
  - FM broadcast band (88-108 MHz) for potential bugs
  - 315/433/868/915 MHz ISM bands
  - 1.2 GHz video transmitter band
  - 2.4 GHz ISM band
- Integrate RF scanning into sweep with 60-second interval
- Add display_name field for all devices with friendly names
- Update frontend to use display_name in dropdowns
- Improve scan status display: '14 WiFi | 20 BT | 3 RF' instead of '14w 20b'
- Auto-select first SDR device when available
This commit is contained in:
Smittix
2026-01-14 13:40:13 +00:00
parent 153336d757
commit 0acbf87dde
2 changed files with 224 additions and 19 deletions

View File

@@ -327,11 +327,14 @@ def get_tscm_devices():
lines = result.stdout.split('\n')
for i, line in enumerate(lines):
if 'Wi-Fi' in line or 'AirPort' in line:
# Get the hardware port name (e.g., "Wi-Fi")
port_name = line.replace('Hardware Port:', '').strip()
for j in range(i + 1, min(i + 3, len(lines))):
if 'Device:' in lines[j]:
device = lines[j].split('Device:')[1].strip()
devices['wifi_interfaces'].append({
'name': device,
'display_name': f'{port_name} ({device})',
'type': 'internal',
'monitor_capable': False
})
@@ -350,9 +353,11 @@ def get_tscm_devices():
if line.startswith('Interface'):
current_iface = line.split()[1]
elif current_iface and 'type' in line:
iface_type = line.split()[-1]
devices['wifi_interfaces'].append({
'name': current_iface,
'type': line.split()[-1],
'display_name': f'Wireless ({current_iface}) - {iface_type}',
'type': iface_type,
'monitor_capable': True
})
current_iface = None
@@ -368,6 +373,7 @@ def get_tscm_devices():
iface = line.split()[0]
devices['wifi_interfaces'].append({
'name': iface,
'display_name': f'Wireless ({iface})',
'type': 'managed',
'monitor_capable': True
})
@@ -383,7 +389,7 @@ def get_tscm_devices():
)
import re
blocks = re.split(r'(?=^hci\d+:)', result.stdout, flags=re.MULTILINE)
for block in blocks:
for idx, block in enumerate(blocks):
if block.strip():
first_line = block.split('\n')[0]
match = re.match(r'(hci\d+):', first_line)
@@ -392,6 +398,7 @@ def get_tscm_devices():
is_up = 'UP RUNNING' in block or '\tUP ' in block
devices['bt_adapters'].append({
'name': iface_name,
'display_name': f'Bluetooth Adapter ({iface_name})',
'type': 'hci',
'status': 'up' if is_up else 'down'
})
@@ -406,21 +413,44 @@ def get_tscm_devices():
if 'Controller' in line:
# Format: Controller XX:XX:XX:XX:XX:XX Name
parts = line.split()
if len(parts) >= 2:
if len(parts) >= 3:
addr = parts[1]
name = ' '.join(parts[2:]) if len(parts) > 2 else 'Bluetooth'
devices['bt_adapters'].append({
'name': parts[1],
'name': addr,
'display_name': f'{name} ({addr[-8:]})',
'type': 'controller',
'status': 'available'
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
elif platform.system() == 'Darwin':
# macOS has built-in Bluetooth
devices['bt_adapters'].append({
'name': 'default',
'type': 'macos',
'status': 'available'
})
# macOS has built-in Bluetooth - get more info via system_profiler
try:
result = subprocess.run(
['system_profiler', 'SPBluetoothDataType'],
capture_output=True, text=True, timeout=10
)
# Extract controller info
bt_name = 'Built-in Bluetooth'
bt_addr = ''
for line in result.stdout.split('\n'):
if 'Address:' in line:
bt_addr = line.split('Address:')[1].strip()
break
devices['bt_adapters'].append({
'name': 'default',
'display_name': f'{bt_name}' + (f' ({bt_addr[-8:]})' if bt_addr else ''),
'type': 'macos',
'status': 'available'
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
devices['bt_adapters'].append({
'name': 'default',
'display_name': 'Built-in Bluetooth',
'type': 'macos',
'status': 'available'
})
# Detect SDR devices
try:
@@ -428,10 +458,16 @@ def get_tscm_devices():
sdr_list = SDRFactory.detect_devices()
for sdr in sdr_list:
# SDRDevice is a dataclass with attributes, not a dict
sdr_type_name = sdr.sdr_type.value if hasattr(sdr.sdr_type, 'value') else str(sdr.sdr_type)
# Create a friendly display name
display_name = sdr.name
if sdr.serial and sdr.serial not in ('N/A', 'Unknown'):
display_name = f'{sdr.name} (SN: {sdr.serial[-8:]})'
devices['sdr_devices'].append({
'index': sdr.index,
'name': sdr.name,
'type': sdr.sdr_type.value if hasattr(sdr.sdr_type, 'value') else str(sdr.sdr_type),
'display_name': display_name,
'type': sdr_type_name,
'serial': sdr.serial,
'driver': sdr.driver
})
@@ -672,6 +708,136 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
return devices
def _scan_rf_signals(sdr_device: int | None, duration: int = 30) -> list[dict]:
"""
Scan for RF signals using SDR (rtl_power).
Scans common surveillance frequency bands:
- 88-108 MHz: FM broadcast (potential FM bugs)
- 315 MHz: Common ISM band (wireless devices)
- 433 MHz: ISM band (European wireless devices, car keys)
- 868 MHz: European ISM band
- 915 MHz: US ISM band
- 1.2 GHz: Video transmitters
- 2.4 GHz: WiFi, Bluetooth, video transmitters
"""
import os
import shutil
import subprocess
import tempfile
signals = []
if not shutil.which('rtl_power'):
logger.warning("rtl_power not found, RF scanning unavailable")
return signals
# Define frequency bands to scan (in Hz) - focus on common bug frequencies
# Format: (start_freq, end_freq, bin_size, description)
scan_bands = [
(88000000, 108000000, 100000, 'FM Broadcast'), # FM bugs
(315000000, 316000000, 10000, '315 MHz ISM'), # US ISM
(433000000, 434000000, 10000, '433 MHz ISM'), # EU ISM
(868000000, 869000000, 10000, '868 MHz ISM'), # EU ISM
(902000000, 928000000, 100000, '915 MHz ISM'), # US ISM
(1200000000, 1300000000, 100000, '1.2 GHz Video'), # Video TX
(2400000000, 2500000000, 500000, '2.4 GHz ISM'), # WiFi/BT/Video
]
# Create temp file for output
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp:
tmp_path = tmp.name
try:
# Build device argument
device_arg = ['-d', str(sdr_device if sdr_device is not None else 0)]
# Scan each band and look for strong signals
for start_freq, end_freq, bin_size, band_name in scan_bands:
if not _sweep_running:
break
try:
# Run rtl_power for a quick sweep of this band
cmd = [
'rtl_power',
'-f', f'{start_freq}:{end_freq}:{bin_size}',
'-g', '40', # Gain
'-i', '1', # Integration interval (1 second)
'-1', # Single shot mode
'-c', '20%', # Crop 20% of edges
] + device_arg + [tmp_path]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=15
)
# Parse the CSV output
if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 0:
with open(tmp_path, 'r') as f:
for line in f:
parts = line.strip().split(',')
if len(parts) >= 7:
try:
# CSV format: date, time, hz_low, hz_high, hz_step, samples, db_values...
hz_low = int(parts[2])
hz_high = int(parts[3])
hz_step = float(parts[4])
db_values = [float(x) for x in parts[6:] if x.strip()]
# Find peaks above noise floor (typically -60 dBm is strong)
noise_floor = sum(db_values) / len(db_values) if db_values else -100
threshold = noise_floor + 15 # Signal must be 15dB above noise
for idx, db in enumerate(db_values):
if db > threshold and db > -50: # Strong signal
freq_hz = hz_low + (idx * hz_step)
freq_mhz = freq_hz / 1000000
signals.append({
'frequency': freq_mhz,
'frequency_hz': freq_hz,
'power': db,
'band': band_name,
'noise_floor': noise_floor,
'signal_strength': db - noise_floor
})
except (ValueError, IndexError):
continue
# Clear file for next band
open(tmp_path, 'w').close()
except subprocess.TimeoutExpired:
logger.warning(f"RF scan timeout for band {band_name}")
except Exception as e:
logger.warning(f"RF scan error for band {band_name}: {e}")
finally:
# Cleanup temp file
try:
os.unlink(tmp_path)
except OSError:
pass
# Deduplicate nearby frequencies (within 100kHz)
if signals:
signals.sort(key=lambda x: x['frequency'])
deduped = [signals[0]]
for sig in signals[1:]:
if sig['frequency'] - deduped[-1]['frequency'] > 0.1: # 100 kHz
deduped.append(sig)
elif sig['power'] > deduped[-1]['power']:
deduped[-1] = sig # Keep stronger signal
signals = deduped
logger.info(f"RF scan found {len(signals)} signals")
return signals
def _run_sweep(
sweep_type: str,
baseline_id: int | None,
@@ -722,8 +888,10 @@ def _run_sweep(
start_time = time.time()
last_wifi_scan = 0
last_bt_scan = 0
last_rf_scan = 0
wifi_scan_interval = 15 # Scan WiFi every 15 seconds
bt_scan_interval = 20 # Scan Bluetooth every 20 seconds
rf_scan_interval = 60 # Scan RF every 60 seconds (it's slower)
while _sweep_running and (time.time() - start_time) < duration:
current_time = time.time()
@@ -768,8 +936,32 @@ def _run_sweep(
except Exception as e:
logger.error(f"Bluetooth scan error: {e}")
# RF scanning would go here if SDR is available
# For now, RF scanning is not implemented
# Perform RF scan using SDR
if rf_enabled and sdr_device is not None and (current_time - last_rf_scan) >= rf_scan_interval:
try:
_emit_event('sweep_progress', {
'progress': min(100, int(((current_time - start_time) / duration) * 100)),
'status': 'Scanning RF spectrum...',
'wifi_count': len(all_wifi),
'bt_count': len(all_bt),
'rf_count': len(all_rf),
})
rf_signals = _scan_rf_signals(sdr_device)
for signal in rf_signals:
freq_key = f"{signal['frequency']:.3f}"
if freq_key not in [f"{s['frequency']:.3f}" for s in all_rf]:
all_rf.append(signal)
# Analyze RF signal for threats
threat = detector.analyze_rf_signal(signal)
if threat:
_handle_threat(threat)
threats_found += 1
sev = threat.get('severity', 'low').lower()
if sev in severity_counts:
severity_counts[sev] += 1
last_rf_scan = current_time
except Exception as e:
logger.error(f"RF scan error: {e}")
# Update progress
elapsed = time.time() - start_time