diff --git a/routes/tscm.py b/routes/tscm.py index 397c850..af25302 100644 --- a/routes/tscm.py +++ b/routes/tscm.py @@ -182,6 +182,11 @@ def start_sweep(): bt_enabled = data.get('bluetooth', True) rf_enabled = data.get('rf', True) + # Get interface selections + wifi_interface = data.get('wifi_interface', '') + bt_interface = data.get('bt_interface', '') + sdr_device = data.get('sdr_device') + # Check for available devices devices = _check_available_devices(wifi_enabled, bt_enabled, rf_enabled) @@ -215,7 +220,8 @@ def start_sweep(): # Start sweep thread _sweep_thread = threading.Thread( target=_run_sweep, - args=(sweep_type, baseline_id, wifi_enabled, bt_enabled, rf_enabled), + args=(sweep_type, baseline_id, wifi_enabled, bt_enabled, rf_enabled, + wifi_interface, bt_interface, sdr_device), daemon=True ) _sweep_thread.start() @@ -298,12 +304,381 @@ def sweep_stream(): ) +@tscm_bp.route('/devices') +def get_tscm_devices(): + """Get available scanning devices for TSCM sweeps.""" + import platform + import shutil + import subprocess + + devices = { + 'wifi_interfaces': [], + 'bt_adapters': [], + 'sdr_devices': [] + } + + # Detect WiFi interfaces + if platform.system() == 'Darwin': # macOS + try: + result = subprocess.run( + ['networksetup', '-listallhardwareports'], + capture_output=True, text=True, timeout=5 + ) + lines = result.stdout.split('\n') + for i, line in enumerate(lines): + if 'Wi-Fi' in line or 'AirPort' in line: + for j in range(i + 1, min(i + 3, len(lines))): + if 'Device:' in lines[j]: + device = lines[j].split('Device:')[1].strip() + devices['wifi_interfaces'].append({ + 'name': device, + 'type': 'internal', + 'monitor_capable': False + }) + break + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + pass + else: # Linux + try: + result = subprocess.run( + ['iw', 'dev'], + capture_output=True, text=True, timeout=5 + ) + current_iface = None + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('Interface'): + current_iface = line.split()[1] + elif current_iface and 'type' in line: + devices['wifi_interfaces'].append({ + 'name': current_iface, + 'type': line.split()[-1], + 'monitor_capable': True + }) + current_iface = None + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + # Fall back to iwconfig + try: + result = subprocess.run( + ['iwconfig'], + capture_output=True, text=True, timeout=5 + ) + for line in result.stdout.split('\n'): + if 'IEEE 802.11' in line: + iface = line.split()[0] + devices['wifi_interfaces'].append({ + 'name': iface, + 'type': 'managed', + 'monitor_capable': True + }) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + pass + + # Detect Bluetooth adapters + if platform.system() == 'Linux': + try: + result = subprocess.run( + ['hciconfig'], + capture_output=True, text=True, timeout=5 + ) + import re + blocks = re.split(r'(?=^hci\d+:)', result.stdout, flags=re.MULTILINE) + for block in blocks: + if block.strip(): + first_line = block.split('\n')[0] + match = re.match(r'(hci\d+):', first_line) + if match: + iface_name = match.group(1) + is_up = 'UP RUNNING' in block or '\tUP ' in block + devices['bt_adapters'].append({ + 'name': iface_name, + 'type': 'hci', + 'status': 'up' if is_up else 'down' + }) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + # Try bluetoothctl as fallback + try: + result = subprocess.run( + ['bluetoothctl', 'list'], + capture_output=True, text=True, timeout=5 + ) + for line in result.stdout.split('\n'): + if 'Controller' in line: + # Format: Controller XX:XX:XX:XX:XX:XX Name + parts = line.split() + if len(parts) >= 2: + devices['bt_adapters'].append({ + 'name': parts[1], + 'type': 'controller', + 'status': 'available' + }) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + pass + elif platform.system() == 'Darwin': + # macOS has built-in Bluetooth + devices['bt_adapters'].append({ + 'name': 'default', + 'type': 'macos', + 'status': 'available' + }) + + # Detect SDR devices + try: + from utils.sdr import SDRFactory + sdr_list = SDRFactory.detect_devices() + for i, sdr in enumerate(sdr_list): + devices['sdr_devices'].append({ + 'index': i, + 'name': sdr.get('name', f'SDR Device {i}'), + 'type': sdr.get('type', 'unknown'), + 'serial': sdr.get('serial', '') + }) + except ImportError: + pass + except Exception as e: + logger.warning(f"Error detecting SDR devices: {e}") + + return jsonify({'status': 'success', 'devices': devices}) + + +def _scan_wifi_networks(interface: str) -> list[dict]: + """Scan for WiFi networks using system tools.""" + import platform + import re + import subprocess + + networks = [] + + if platform.system() == 'Darwin': + # macOS: Use airport utility + airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport' + try: + result = subprocess.run( + [airport_path, '-s'], + capture_output=True, text=True, timeout=15 + ) + # Parse airport output + # Format: SSID BSSID RSSI CHANNEL HT CC SECURITY + lines = result.stdout.strip().split('\n') + for line in lines[1:]: # Skip header + if not line.strip(): + continue + # Parse the line - format is space-separated but SSID can have spaces + parts = line.split() + if len(parts) >= 7: + # BSSID is always XX:XX:XX:XX:XX:XX format + bssid_idx = None + for i, p in enumerate(parts): + if re.match(r'^[0-9a-fA-F:]{17}$', p): + bssid_idx = i + break + if bssid_idx is not None: + ssid = ' '.join(parts[:bssid_idx]) if bssid_idx > 0 else '[Hidden]' + bssid = parts[bssid_idx] + rssi = parts[bssid_idx + 1] if len(parts) > bssid_idx + 1 else '-100' + channel = parts[bssid_idx + 2] if len(parts) > bssid_idx + 2 else '0' + security = ' '.join(parts[bssid_idx + 5:]) if len(parts) > bssid_idx + 5 else '' + networks.append({ + 'bssid': bssid.upper(), + 'essid': ssid or '[Hidden]', + 'power': rssi, + 'channel': channel, + 'privacy': security + }) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e: + logger.warning(f"macOS WiFi scan failed: {e}") + + else: + # Linux: Try iwlist scan + iface = interface or 'wlan0' + try: + result = subprocess.run( + ['iwlist', iface, 'scan'], + capture_output=True, text=True, timeout=30 + ) + current_network = {} + for line in result.stdout.split('\n'): + line = line.strip() + if 'Cell' in line and 'Address:' in line: + if current_network.get('bssid'): + networks.append(current_network) + bssid = line.split('Address:')[1].strip() + current_network = {'bssid': bssid.upper(), 'essid': '[Hidden]'} + elif 'ESSID:' in line: + essid = line.split('ESSID:')[1].strip().strip('"') + current_network['essid'] = essid or '[Hidden]' + elif 'Channel:' in line: + channel = line.split('Channel:')[1].strip() + current_network['channel'] = channel + elif 'Signal level=' in line: + match = re.search(r'Signal level[=:]?\s*(-?\d+)', line) + if match: + current_network['power'] = match.group(1) + elif 'Encryption key:' in line: + encrypted = 'on' in line.lower() + current_network['encrypted'] = encrypted + elif 'WPA' in line or 'WPA2' in line: + current_network['privacy'] = 'WPA2' if 'WPA2' in line else 'WPA' + if current_network.get('bssid'): + networks.append(current_network) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e: + logger.warning(f"Linux WiFi scan failed: {e}") + + return networks + + +def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]: + """Scan for Bluetooth devices using system tools.""" + import platform + import os + import pty + import re + import select + import subprocess + + devices = [] + seen_macs = set() + + if platform.system() == 'Darwin': + # macOS: Use system_profiler for basic Bluetooth info + try: + result = subprocess.run( + ['system_profiler', 'SPBluetoothDataType', '-json'], + capture_output=True, text=True, timeout=15 + ) + import json + data = json.loads(result.stdout) + bt_data = data.get('SPBluetoothDataType', [{}])[0] + + # Get connected/paired devices + for section in ['device_connected', 'device_title']: + section_data = bt_data.get(section, {}) + if isinstance(section_data, dict): + for name, info in section_data.items(): + if isinstance(info, dict): + mac = info.get('device_address', '') + if mac and mac not in seen_macs: + seen_macs.add(mac) + devices.append({ + 'mac': mac.upper(), + 'name': name, + 'type': info.get('device_minorType', 'unknown'), + 'connected': section == 'device_connected' + }) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError, json.JSONDecodeError) as e: + logger.warning(f"macOS Bluetooth scan failed: {e}") + + else: + # Linux: Use bluetoothctl or hcitool + iface = interface or 'hci0' + + # Try bluetoothctl first + try: + master_fd, slave_fd = pty.openpty() + process = subprocess.Popen( + ['bluetoothctl'], + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True + ) + os.close(slave_fd) + + # Start scanning + time.sleep(0.3) + os.write(master_fd, b'power on\n') + time.sleep(0.3) + os.write(master_fd, b'scan on\n') + + # Collect devices for specified duration + scan_end = time.time() + duration + buffer = '' + + while time.time() < scan_end and _sweep_running: + readable, _, _ = select.select([master_fd], [], [], 1.0) + if readable: + try: + data = os.read(master_fd, 4096) + if not data: + break + buffer += data.decode('utf-8', errors='replace') + + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = re.sub(r'\x1b\[[0-9;]*m', '', line).strip() + + if 'Device' in line: + match = re.search( + r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' + r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)', + line + ) + if match: + mac = match.group(1).upper() + name = match.group(2).strip() + # Remove RSSI from name if present + name = re.sub(r'\s*RSSI:\s*-?\d+\s*', '', name).strip() + + if mac not in seen_macs: + seen_macs.add(mac) + devices.append({ + 'mac': mac, + 'name': name or '[Unknown]' + }) + except OSError: + break + + # Stop scanning and cleanup + try: + os.write(master_fd, b'scan off\n') + time.sleep(0.2) + os.write(master_fd, b'quit\n') + except OSError: + pass + + process.terminate() + try: + process.wait(timeout=2) + except subprocess.TimeoutExpired: + process.kill() + + try: + os.close(master_fd) + except OSError: + pass + + except (FileNotFoundError, subprocess.SubprocessError) as e: + logger.warning(f"bluetoothctl scan failed: {e}") + + # Fallback to hcitool + try: + result = subprocess.run( + ['hcitool', '-i', iface, 'scan'], + capture_output=True, text=True, timeout=duration + 5 + ) + for line in result.stdout.split('\n'): + parts = line.split() + if len(parts) >= 1 and ':' in parts[0]: + mac = parts[0].upper() + name = ' '.join(parts[1:]) if len(parts) > 1 else '[Unknown]' + if mac not in seen_macs: + seen_macs.add(mac) + devices.append({'mac': mac, 'name': name}) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e: + logger.warning(f"hcitool scan failed: {e}") + + return devices + + def _run_sweep( sweep_type: str, baseline_id: int | None, wifi_enabled: bool, bt_enabled: bool, - rf_enabled: bool + rf_enabled: bool, + wifi_interface: str = '', + bt_interface: str = '', + sdr_device: int | None = None ) -> None: """ Run the TSCM sweep in a background thread. @@ -337,41 +712,62 @@ def _run_sweep( # Collect and analyze data threats_found = 0 - all_wifi = [] - all_bt = [] + severity_counts = {'critical': 0, 'high': 0, 'medium': 0, 'low': 0} + all_wifi = {} # Use dict for deduplication by BSSID + all_bt = {} # Use dict for deduplication by MAC all_rf = [] start_time = time.time() + last_wifi_scan = 0 + last_bt_scan = 0 + wifi_scan_interval = 15 # Scan WiFi every 15 seconds + bt_scan_interval = 20 # Scan Bluetooth every 20 seconds while _sweep_running and (time.time() - start_time) < duration: - # Import app module to access shared data stores - try: - import app as app_module + current_time = time.time() - # Collect WiFi data - if wifi_enabled and hasattr(app_module, 'wifi_networks'): - wifi_data = list(app_module.wifi_networks.data.values()) - for device in wifi_data: - if device not in all_wifi: - all_wifi.append(device) - threat = detector.analyze_wifi_device(device) + # Perform WiFi scan + if wifi_enabled and (current_time - last_wifi_scan) >= wifi_scan_interval: + try: + wifi_networks = _scan_wifi_networks(wifi_interface) + for network in wifi_networks: + bssid = network.get('bssid', '') + if bssid and bssid not in all_wifi: + all_wifi[bssid] = network + # Analyze for threats + threat = detector.analyze_wifi_device(network) if threat: _handle_threat(threat) threats_found += 1 + sev = threat.get('severity', 'low').lower() + if sev in severity_counts: + severity_counts[sev] += 1 + last_wifi_scan = current_time + except Exception as e: + logger.error(f"WiFi scan error: {e}") - # Collect Bluetooth data - if bt_enabled and hasattr(app_module, 'bt_devices'): - bt_data = list(app_module.bt_devices.data.values()) - for device in bt_data: - if device not in all_bt: - all_bt.append(device) + # Perform Bluetooth scan + if bt_enabled and (current_time - last_bt_scan) >= bt_scan_interval: + try: + bt_devices = _scan_bluetooth_devices(bt_interface, duration=8) + for device in bt_devices: + mac = device.get('mac', '') + if mac and mac not in all_bt: + all_bt[mac] = device + # Analyze for threats threat = detector.analyze_bt_device(device) if threat: _handle_threat(threat) threats_found += 1 + sev = threat.get('severity', 'low').lower() + if sev in severity_counts: + severity_counts[sev] += 1 + last_bt_scan = current_time + except Exception as e: + logger.error(f"Bluetooth scan error: {e}") - except ImportError: - logger.warning("Could not import app module for data collection") + # RF scanning would go here if SDR is available + # For now, RF scanning is not implemented # Update progress elapsed = time.time() - start_time @@ -385,6 +781,7 @@ def _run_sweep( 'bt_count': len(all_bt), 'rf_count': len(all_rf), 'threats_found': threats_found, + 'severity_counts': severity_counts, }) time.sleep(2) # Update every 2 seconds @@ -398,6 +795,7 @@ def _run_sweep( 'wifi_devices': len(all_wifi), 'bt_devices': len(all_bt), 'rf_signals': len(all_rf), + 'severity_counts': severity_counts, }, threats_found=threats_found, completed=True @@ -409,6 +807,7 @@ def _run_sweep( 'wifi_count': len(all_wifi), 'bt_count': len(all_bt), 'rf_count': len(all_rf), + 'severity_counts': severity_counts, }) except Exception as e: diff --git a/templates/index.html b/templates/index.html index 7ca5804..1f1a550 100644 --- a/templates/index.html +++ b/templates/index.html @@ -1050,20 +1050,36 @@