Implement TSCM device selection and actual scanning

- Add /tscm/devices endpoint to list available WiFi interfaces,
  Bluetooth adapters, and SDR devices
- Add _scan_wifi_networks() for actual WiFi scanning (macOS/Linux)
- Add _scan_bluetooth_devices() for actual Bluetooth scanning
- Update _run_sweep() to perform real scans with selected interfaces
- Add severity_counts tracking in progress events
- Fix frontend to correctly access device and severity data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-14 13:23:26 +00:00
parent de13d5ea74
commit 570710c556
2 changed files with 526 additions and 41 deletions
+421 -22
View File
@@ -182,6 +182,11 @@ def start_sweep():
bt_enabled = data.get('bluetooth', True)
rf_enabled = data.get('rf', True)
# Get interface selections
wifi_interface = data.get('wifi_interface', '')
bt_interface = data.get('bt_interface', '')
sdr_device = data.get('sdr_device')
# Check for available devices
devices = _check_available_devices(wifi_enabled, bt_enabled, rf_enabled)
@@ -215,7 +220,8 @@ def start_sweep():
# Start sweep thread
_sweep_thread = threading.Thread(
target=_run_sweep,
args=(sweep_type, baseline_id, wifi_enabled, bt_enabled, rf_enabled),
args=(sweep_type, baseline_id, wifi_enabled, bt_enabled, rf_enabled,
wifi_interface, bt_interface, sdr_device),
daemon=True
)
_sweep_thread.start()
@@ -298,12 +304,381 @@ def sweep_stream():
)
@tscm_bp.route('/devices')
def get_tscm_devices():
"""Get available scanning devices for TSCM sweeps."""
import platform
import shutil
import subprocess
devices = {
'wifi_interfaces': [],
'bt_adapters': [],
'sdr_devices': []
}
# Detect WiFi interfaces
if platform.system() == 'Darwin': # macOS
try:
result = subprocess.run(
['networksetup', '-listallhardwareports'],
capture_output=True, text=True, timeout=5
)
lines = result.stdout.split('\n')
for i, line in enumerate(lines):
if 'Wi-Fi' in line or 'AirPort' in line:
for j in range(i + 1, min(i + 3, len(lines))):
if 'Device:' in lines[j]:
device = lines[j].split('Device:')[1].strip()
devices['wifi_interfaces'].append({
'name': device,
'type': 'internal',
'monitor_capable': False
})
break
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
else: # Linux
try:
result = subprocess.run(
['iw', 'dev'],
capture_output=True, text=True, timeout=5
)
current_iface = None
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('Interface'):
current_iface = line.split()[1]
elif current_iface and 'type' in line:
devices['wifi_interfaces'].append({
'name': current_iface,
'type': line.split()[-1],
'monitor_capable': True
})
current_iface = None
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
# Fall back to iwconfig
try:
result = subprocess.run(
['iwconfig'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.split('\n'):
if 'IEEE 802.11' in line:
iface = line.split()[0]
devices['wifi_interfaces'].append({
'name': iface,
'type': 'managed',
'monitor_capable': True
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Detect Bluetooth adapters
if platform.system() == 'Linux':
try:
result = subprocess.run(
['hciconfig'],
capture_output=True, text=True, timeout=5
)
import re
blocks = re.split(r'(?=^hci\d+:)', result.stdout, flags=re.MULTILINE)
for block in blocks:
if block.strip():
first_line = block.split('\n')[0]
match = re.match(r'(hci\d+):', first_line)
if match:
iface_name = match.group(1)
is_up = 'UP RUNNING' in block or '\tUP ' in block
devices['bt_adapters'].append({
'name': iface_name,
'type': 'hci',
'status': 'up' if is_up else 'down'
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
# Try bluetoothctl as fallback
try:
result = subprocess.run(
['bluetoothctl', 'list'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.split('\n'):
if 'Controller' in line:
# Format: Controller XX:XX:XX:XX:XX:XX Name
parts = line.split()
if len(parts) >= 2:
devices['bt_adapters'].append({
'name': parts[1],
'type': 'controller',
'status': 'available'
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
elif platform.system() == 'Darwin':
# macOS has built-in Bluetooth
devices['bt_adapters'].append({
'name': 'default',
'type': 'macos',
'status': 'available'
})
# Detect SDR devices
try:
from utils.sdr import SDRFactory
sdr_list = SDRFactory.detect_devices()
for i, sdr in enumerate(sdr_list):
devices['sdr_devices'].append({
'index': i,
'name': sdr.get('name', f'SDR Device {i}'),
'type': sdr.get('type', 'unknown'),
'serial': sdr.get('serial', '')
})
except ImportError:
pass
except Exception as e:
logger.warning(f"Error detecting SDR devices: {e}")
return jsonify({'status': 'success', 'devices': devices})
def _scan_wifi_networks(interface: str) -> list[dict]:
"""Scan for WiFi networks using system tools."""
import platform
import re
import subprocess
networks = []
if platform.system() == 'Darwin':
# macOS: Use airport utility
airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport'
try:
result = subprocess.run(
[airport_path, '-s'],
capture_output=True, text=True, timeout=15
)
# Parse airport output
# Format: SSID BSSID RSSI CHANNEL HT CC SECURITY
lines = result.stdout.strip().split('\n')
for line in lines[1:]: # Skip header
if not line.strip():
continue
# Parse the line - format is space-separated but SSID can have spaces
parts = line.split()
if len(parts) >= 7:
# BSSID is always XX:XX:XX:XX:XX:XX format
bssid_idx = None
for i, p in enumerate(parts):
if re.match(r'^[0-9a-fA-F:]{17}$', p):
bssid_idx = i
break
if bssid_idx is not None:
ssid = ' '.join(parts[:bssid_idx]) if bssid_idx > 0 else '[Hidden]'
bssid = parts[bssid_idx]
rssi = parts[bssid_idx + 1] if len(parts) > bssid_idx + 1 else '-100'
channel = parts[bssid_idx + 2] if len(parts) > bssid_idx + 2 else '0'
security = ' '.join(parts[bssid_idx + 5:]) if len(parts) > bssid_idx + 5 else ''
networks.append({
'bssid': bssid.upper(),
'essid': ssid or '[Hidden]',
'power': rssi,
'channel': channel,
'privacy': security
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"macOS WiFi scan failed: {e}")
else:
# Linux: Try iwlist scan
iface = interface or 'wlan0'
try:
result = subprocess.run(
['iwlist', iface, 'scan'],
capture_output=True, text=True, timeout=30
)
current_network = {}
for line in result.stdout.split('\n'):
line = line.strip()
if 'Cell' in line and 'Address:' in line:
if current_network.get('bssid'):
networks.append(current_network)
bssid = line.split('Address:')[1].strip()
current_network = {'bssid': bssid.upper(), 'essid': '[Hidden]'}
elif 'ESSID:' in line:
essid = line.split('ESSID:')[1].strip().strip('"')
current_network['essid'] = essid or '[Hidden]'
elif 'Channel:' in line:
channel = line.split('Channel:')[1].strip()
current_network['channel'] = channel
elif 'Signal level=' in line:
match = re.search(r'Signal level[=:]?\s*(-?\d+)', line)
if match:
current_network['power'] = match.group(1)
elif 'Encryption key:' in line:
encrypted = 'on' in line.lower()
current_network['encrypted'] = encrypted
elif 'WPA' in line or 'WPA2' in line:
current_network['privacy'] = 'WPA2' if 'WPA2' in line else 'WPA'
if current_network.get('bssid'):
networks.append(current_network)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"Linux WiFi scan failed: {e}")
return networks
def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
"""Scan for Bluetooth devices using system tools."""
import platform
import os
import pty
import re
import select
import subprocess
devices = []
seen_macs = set()
if platform.system() == 'Darwin':
# macOS: Use system_profiler for basic Bluetooth info
try:
result = subprocess.run(
['system_profiler', 'SPBluetoothDataType', '-json'],
capture_output=True, text=True, timeout=15
)
import json
data = json.loads(result.stdout)
bt_data = data.get('SPBluetoothDataType', [{}])[0]
# Get connected/paired devices
for section in ['device_connected', 'device_title']:
section_data = bt_data.get(section, {})
if isinstance(section_data, dict):
for name, info in section_data.items():
if isinstance(info, dict):
mac = info.get('device_address', '')
if mac and mac not in seen_macs:
seen_macs.add(mac)
devices.append({
'mac': mac.upper(),
'name': name,
'type': info.get('device_minorType', 'unknown'),
'connected': section == 'device_connected'
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError, json.JSONDecodeError) as e:
logger.warning(f"macOS Bluetooth scan failed: {e}")
else:
# Linux: Use bluetoothctl or hcitool
iface = interface or 'hci0'
# Try bluetoothctl first
try:
master_fd, slave_fd = pty.openpty()
process = subprocess.Popen(
['bluetoothctl'],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd)
# Start scanning
time.sleep(0.3)
os.write(master_fd, b'power on\n')
time.sleep(0.3)
os.write(master_fd, b'scan on\n')
# Collect devices for specified duration
scan_end = time.time() + duration
buffer = ''
while time.time() < scan_end and _sweep_running:
readable, _, _ = select.select([master_fd], [], [], 1.0)
if readable:
try:
data = os.read(master_fd, 4096)
if not data:
break
buffer += data.decode('utf-8', errors='replace')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = re.sub(r'\x1b\[[0-9;]*m', '', line).strip()
if 'Device' in line:
match = re.search(
r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:'
r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)',
line
)
if match:
mac = match.group(1).upper()
name = match.group(2).strip()
# Remove RSSI from name if present
name = re.sub(r'\s*RSSI:\s*-?\d+\s*', '', name).strip()
if mac not in seen_macs:
seen_macs.add(mac)
devices.append({
'mac': mac,
'name': name or '[Unknown]'
})
except OSError:
break
# Stop scanning and cleanup
try:
os.write(master_fd, b'scan off\n')
time.sleep(0.2)
os.write(master_fd, b'quit\n')
except OSError:
pass
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
try:
os.close(master_fd)
except OSError:
pass
except (FileNotFoundError, subprocess.SubprocessError) as e:
logger.warning(f"bluetoothctl scan failed: {e}")
# Fallback to hcitool
try:
result = subprocess.run(
['hcitool', '-i', iface, 'scan'],
capture_output=True, text=True, timeout=duration + 5
)
for line in result.stdout.split('\n'):
parts = line.split()
if len(parts) >= 1 and ':' in parts[0]:
mac = parts[0].upper()
name = ' '.join(parts[1:]) if len(parts) > 1 else '[Unknown]'
if mac not in seen_macs:
seen_macs.add(mac)
devices.append({'mac': mac, 'name': name})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"hcitool scan failed: {e}")
return devices
def _run_sweep(
sweep_type: str,
baseline_id: int | None,
wifi_enabled: bool,
bt_enabled: bool,
rf_enabled: bool
rf_enabled: bool,
wifi_interface: str = '',
bt_interface: str = '',
sdr_device: int | None = None
) -> None:
"""
Run the TSCM sweep in a background thread.
@@ -337,41 +712,62 @@ def _run_sweep(
# Collect and analyze data
threats_found = 0
all_wifi = []
all_bt = []
severity_counts = {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
all_wifi = {} # Use dict for deduplication by BSSID
all_bt = {} # Use dict for deduplication by MAC
all_rf = []
start_time = time.time()
last_wifi_scan = 0
last_bt_scan = 0
wifi_scan_interval = 15 # Scan WiFi every 15 seconds
bt_scan_interval = 20 # Scan Bluetooth every 20 seconds
while _sweep_running and (time.time() - start_time) < duration:
# Import app module to access shared data stores
try:
import app as app_module
current_time = time.time()
# Collect WiFi data
if wifi_enabled and hasattr(app_module, 'wifi_networks'):
wifi_data = list(app_module.wifi_networks.data.values())
for device in wifi_data:
if device not in all_wifi:
all_wifi.append(device)
threat = detector.analyze_wifi_device(device)
# Perform WiFi scan
if wifi_enabled and (current_time - last_wifi_scan) >= wifi_scan_interval:
try:
wifi_networks = _scan_wifi_networks(wifi_interface)
for network in wifi_networks:
bssid = network.get('bssid', '')
if bssid and bssid not in all_wifi:
all_wifi[bssid] = network
# Analyze for threats
threat = detector.analyze_wifi_device(network)
if threat:
_handle_threat(threat)
threats_found += 1
sev = threat.get('severity', 'low').lower()
if sev in severity_counts:
severity_counts[sev] += 1
last_wifi_scan = current_time
except Exception as e:
logger.error(f"WiFi scan error: {e}")
# Collect Bluetooth data
if bt_enabled and hasattr(app_module, 'bt_devices'):
bt_data = list(app_module.bt_devices.data.values())
for device in bt_data:
if device not in all_bt:
all_bt.append(device)
# Perform Bluetooth scan
if bt_enabled and (current_time - last_bt_scan) >= bt_scan_interval:
try:
bt_devices = _scan_bluetooth_devices(bt_interface, duration=8)
for device in bt_devices:
mac = device.get('mac', '')
if mac and mac not in all_bt:
all_bt[mac] = device
# Analyze for threats
threat = detector.analyze_bt_device(device)
if threat:
_handle_threat(threat)
threats_found += 1
sev = threat.get('severity', 'low').lower()
if sev in severity_counts:
severity_counts[sev] += 1
last_bt_scan = current_time
except Exception as e:
logger.error(f"Bluetooth scan error: {e}")
except ImportError:
logger.warning("Could not import app module for data collection")
# RF scanning would go here if SDR is available
# For now, RF scanning is not implemented
# Update progress
elapsed = time.time() - start_time
@@ -385,6 +781,7 @@ def _run_sweep(
'bt_count': len(all_bt),
'rf_count': len(all_rf),
'threats_found': threats_found,
'severity_counts': severity_counts,
})
time.sleep(2) # Update every 2 seconds
@@ -398,6 +795,7 @@ def _run_sweep(
'wifi_devices': len(all_wifi),
'bt_devices': len(all_bt),
'rf_signals': len(all_rf),
'severity_counts': severity_counts,
},
threats_found=threats_found,
completed=True
@@ -409,6 +807,7 @@ def _run_sweep(
'wifi_count': len(all_wifi),
'bt_count': len(all_bt),
'rf_count': len(all_rf),
'severity_counts': severity_counts,
})
except Exception as e:
+105 -19
View File
@@ -1050,20 +1050,36 @@
<div class="section">
<h3>Scan Sources</h3>
<div class="checkbox-group">
<label>
<input type="checkbox" id="tscmWifiEnabled" checked>
WiFi Networks
</label>
<label>
<input type="checkbox" id="tscmBtEnabled" checked>
Bluetooth Devices
</label>
<label>
<input type="checkbox" id="tscmRfEnabled" checked>
RF Signals
</label>
<div class="form-group" style="margin-bottom: 8px;">
<div style="display: flex; align-items: center; gap: 8px;">
<input type="checkbox" id="tscmWifiEnabled" checked style="margin: 0;">
<label for="tscmWifiEnabled" style="flex: 1; margin: 0;">WiFi Networks</label>
</div>
<select id="tscmWifiInterface" style="width: 100%; margin-top: 4px; font-size: 11px;">
<option value="">Select WiFi interface...</option>
</select>
</div>
<div class="form-group" style="margin-bottom: 8px;">
<div style="display: flex; align-items: center; gap: 8px;">
<input type="checkbox" id="tscmBtEnabled" checked style="margin: 0;">
<label for="tscmBtEnabled" style="flex: 1; margin: 0;">Bluetooth Devices</label>
</div>
<select id="tscmBtInterface" style="width: 100%; margin-top: 4px; font-size: 11px;">
<option value="">Select Bluetooth adapter...</option>
</select>
</div>
<div class="form-group" style="margin-bottom: 8px;">
<div style="display: flex; align-items: center; gap: 8px;">
<input type="checkbox" id="tscmRfEnabled" style="margin: 0;">
<label for="tscmRfEnabled" style="flex: 1; margin: 0;">RF Signals</label>
</div>
<select id="tscmSdrDevice" style="width: 100%; margin-top: 4px; font-size: 11px;">
<option value="">Select SDR device...</option>
</select>
</div>
<button class="preset-btn" onclick="refreshTscmDevices()" style="width: 100%; margin-top: 8px; font-size: 10px;">
🔄 Refresh Devices
</button>
</div>
<div class="section">
@@ -2953,6 +2969,7 @@
// Initialize TSCM mode when selected
if (mode === 'tscm') {
loadTscmBaselines();
refreshTscmDevices();
}
// Show/hide Device Intelligence for modes that use it (not for satellite/aircraft/tscm)
@@ -9615,6 +9632,68 @@
let tscmBtDevices = [];
let isRecordingBaseline = false;
async function refreshTscmDevices() {
// Fetch available interfaces for TSCM scanning
try {
const response = await fetch('/tscm/devices');
const data = await response.json();
const devices = data.devices || {};
// Populate WiFi interfaces
const wifiSelect = document.getElementById('tscmWifiInterface');
wifiSelect.innerHTML = '<option value="">Select WiFi interface...</option>';
if (devices.wifi_interfaces && devices.wifi_interfaces.length > 0) {
devices.wifi_interfaces.forEach(iface => {
const opt = document.createElement('option');
opt.value = iface.name;
opt.textContent = `${iface.name}${iface.type ? ' (' + iface.type + ')' : ''}`;
wifiSelect.appendChild(opt);
});
// Auto-select first interface
if (devices.wifi_interfaces.length > 0) {
wifiSelect.value = devices.wifi_interfaces[0].name;
}
} else {
wifiSelect.innerHTML = '<option value="">No WiFi interfaces found</option>';
}
// Populate Bluetooth adapters
const btSelect = document.getElementById('tscmBtInterface');
btSelect.innerHTML = '<option value="">Select Bluetooth adapter...</option>';
if (devices.bt_adapters && devices.bt_adapters.length > 0) {
devices.bt_adapters.forEach(adapter => {
const opt = document.createElement('option');
opt.value = adapter.name;
opt.textContent = `${adapter.name}${adapter.status ? ' [' + adapter.status + ']' : ''}`;
btSelect.appendChild(opt);
});
// Auto-select first adapter
if (devices.bt_adapters.length > 0) {
btSelect.value = devices.bt_adapters[0].name;
}
} else {
btSelect.innerHTML = '<option value="">No Bluetooth adapters found</option>';
}
// Populate SDR devices
const sdrSelect = document.getElementById('tscmSdrDevice');
sdrSelect.innerHTML = '<option value="">Select SDR device...</option>';
if (devices.sdr_devices && devices.sdr_devices.length > 0) {
devices.sdr_devices.forEach(dev => {
const opt = document.createElement('option');
opt.value = dev.index;
opt.textContent = `${dev.index}: ${dev.name || 'SDR Device'}`;
sdrSelect.appendChild(opt);
});
} else {
sdrSelect.innerHTML = '<option value="">No SDR devices found</option>';
}
} catch (e) {
console.error('Failed to refresh TSCM devices:', e);
}
}
async function loadTscmBaselines() {
try {
const response = await fetch('/tscm/baselines');
@@ -9640,6 +9719,9 @@
const wifiEnabled = document.getElementById('tscmWifiEnabled').checked;
const btEnabled = document.getElementById('tscmBtEnabled').checked;
const rfEnabled = document.getElementById('tscmRfEnabled').checked;
const wifiInterface = document.getElementById('tscmWifiInterface').value;
const btInterface = document.getElementById('tscmBtInterface').value;
const sdrDevice = document.getElementById('tscmSdrDevice').value;
// Clear any previous warnings
document.getElementById('tscmDeviceWarnings').style.display = 'none';
@@ -9654,7 +9736,10 @@
baseline_id: baselineId ? parseInt(baselineId) : null,
wifi: wifiEnabled,
bluetooth: btEnabled,
rf: rfEnabled
rf: rfEnabled,
wifi_interface: wifiInterface,
bt_interface: btInterface,
sdr_device: sdrDevice ? parseInt(sdrDevice) : null
})
});
@@ -9793,15 +9878,16 @@
: `SCANNING ${data.wifi_count}W ${data.bt_count}B`;
document.getElementById('tscmProgressLabel').textContent = statusText;
// Update counts in sidebar
// Update counts in sidebar (from severity_counts object)
const counts = data.severity_counts || {};
const criticalEl = document.querySelector('#tscmThreatSummary .threat-card.critical .count');
const highEl = document.querySelector('#tscmThreatSummary .threat-card.high .count');
const mediumEl = document.querySelector('#tscmThreatSummary .threat-card.medium .count');
const lowEl = document.querySelector('#tscmThreatSummary .threat-card.low .count');
if (criticalEl) criticalEl.textContent = data.critical || 0;
if (highEl) highEl.textContent = data.high || 0;
if (mediumEl) mediumEl.textContent = data.medium || 0;
if (lowEl) lowEl.textContent = data.low || 0;
if (criticalEl) criticalEl.textContent = counts.critical || 0;
if (highEl) highEl.textContent = counts.high || 0;
if (mediumEl) mediumEl.textContent = counts.medium || 0;
if (lowEl) lowEl.textContent = counts.low || 0;
}
function addTscmThreat(threat) {