Add unified WiFi scanning module with dual-mode architecture

Backend:
- New utils/wifi/ package with models, scanner, parsers, channel analyzer
- Quick Scan mode using system tools (nmcli, iw, iwlist, airport)
- Deep Scan mode using airodump-ng with monitor mode
- Hidden SSID correlation engine
- Channel utilization analysis with recommendations
- v2 API endpoints at /wifi/v2/* with SSE streaming
- TSCM integration updated to use new scanner (backwards compatible)

Frontend:
- WiFi mode controller (wifi.js) with dual-mode support
- Channel utilization chart component (channel-chart.js)
- Updated wifi.html template with scan mode tabs and export

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-21 22:06:16 +00:00
parent 3d0c505178
commit 9515f5fd7a
19 changed files with 6105 additions and 156 deletions

View File

@@ -6,6 +6,7 @@ def register_blueprints(app):
from .sensor import sensor_bp
from .rtlamr import rtlamr_bp
from .wifi import wifi_bp
from .wifi_v2 import wifi_v2_bp
from .bluetooth import bluetooth_bp
from .bluetooth_v2 import bluetooth_v2_bp
from .adsb import adsb_bp
@@ -22,6 +23,7 @@ def register_blueprints(app):
app.register_blueprint(sensor_bp)
app.register_blueprint(rtlamr_bp)
app.register_blueprint(wifi_bp)
app.register_blueprint(wifi_v2_bp) # New unified WiFi API
app.register_blueprint(bluetooth_bp)
app.register_blueprint(bluetooth_v2_bp) # New unified Bluetooth API
app.register_blueprint(adsb_bp)

View File

@@ -636,166 +636,41 @@ def get_tscm_devices():
def _scan_wifi_networks(interface: str) -> list[dict]:
"""Scan for WiFi networks using system tools."""
import platform
import re
import subprocess
"""
Scan for WiFi networks using the unified WiFi scanner.
networks = []
This is a facade that maintains backwards compatibility with TSCM
while using the new unified scanner module.
if platform.system() == 'Darwin':
# macOS: Use airport utility
airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport'
try:
result = subprocess.run(
[airport_path, '-s'],
capture_output=True, text=True, timeout=15
)
# Parse airport output
# Format: SSID BSSID RSSI CHANNEL HT CC SECURITY
lines = result.stdout.strip().split('\n')
for line in lines[1:]: # Skip header
if not line.strip():
continue
# Parse the line - format is space-separated but SSID can have spaces
parts = line.split()
if len(parts) >= 7:
# BSSID is always XX:XX:XX:XX:XX:XX format
bssid_idx = None
for i, p in enumerate(parts):
if re.match(r'^[0-9a-fA-F:]{17}$', p):
bssid_idx = i
break
if bssid_idx is not None:
ssid = ' '.join(parts[:bssid_idx]) if bssid_idx > 0 else '[Hidden]'
bssid = parts[bssid_idx]
rssi = parts[bssid_idx + 1] if len(parts) > bssid_idx + 1 else '-100'
channel = parts[bssid_idx + 2] if len(parts) > bssid_idx + 2 else '0'
security = ' '.join(parts[bssid_idx + 5:]) if len(parts) > bssid_idx + 5 else ''
networks.append({
'bssid': bssid.upper(),
'essid': ssid or '[Hidden]',
'power': rssi,
'channel': channel,
'privacy': security
})
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"macOS WiFi scan failed: {e}")
Args:
interface: WiFi interface name (optional).
else:
# Linux: Try multiple scan methods
import shutil
Returns:
List of network dicts with: bssid, essid, power, channel, privacy
"""
try:
from utils.wifi import get_wifi_scanner
# Detect wireless interface if not specified
if not interface:
try:
import glob
wireless_paths = glob.glob('/sys/class/net/*/wireless')
if wireless_paths:
iface = wireless_paths[0].split('/')[4]
else:
iface = 'wlan0'
except Exception:
iface = 'wlan0'
else:
iface = interface
scanner = get_wifi_scanner()
result = scanner.quick_scan(interface=interface, timeout=15)
logger.info(f"WiFi scan using interface: {iface}")
if result.error:
logger.warning(f"WiFi scan error: {result.error}")
# Method 1: Try iw scan (sometimes works without root)
if shutil.which('iw'):
try:
logger.info("Trying 'iw' scan...")
result = subprocess.run(
['iw', 'dev', iface, 'scan'],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and 'BSS' in result.stdout:
# Parse iw output
current_bss = None
for line in result.stdout.split('\n'):
if line.startswith('BSS '):
if current_bss and current_bss.get('bssid'):
networks.append(current_bss)
# Extract BSSID from "BSS xx:xx:xx:xx:xx:xx(on wlan0)"
bssid_match = re.search(r'BSS ([0-9a-fA-F:]{17})', line)
if bssid_match:
current_bss = {'bssid': bssid_match.group(1).upper(), 'essid': '[Hidden]'}
elif current_bss:
line = line.strip()
if line.startswith('SSID:'):
ssid = line[5:].strip()
current_bss['essid'] = ssid or '[Hidden]'
elif line.startswith('signal:'):
sig_match = re.search(r'(-?\d+)', line)
if sig_match:
current_bss['power'] = sig_match.group(1)
elif line.startswith('freq:'):
freq = line[5:].strip()
# Convert frequency to channel
try:
freq_mhz = int(freq)
if freq_mhz < 3000:
channel = (freq_mhz - 2407) // 5
else:
channel = (freq_mhz - 5000) // 5
current_bss['channel'] = str(channel)
except ValueError:
pass
elif 'WPA' in line or 'RSN' in line:
current_bss['privacy'] = 'WPA2' if 'RSN' in line else 'WPA'
if current_bss and current_bss.get('bssid'):
networks.append(current_bss)
logger.info(f"iw scan found {len(networks)} networks")
elif 'Operation not permitted' in result.stderr or result.returncode != 0:
logger.warning(f"iw scan requires root: {result.stderr[:100]}")
except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"iw scan failed: {e}")
# Convert to legacy format for TSCM
networks = []
for ap in result.access_points:
networks.append(ap.to_legacy_dict())
# Method 2: Try iwlist scan if iw didn't work
if not networks and shutil.which('iwlist'):
try:
logger.info("Trying 'iwlist' scan...")
result = subprocess.run(
['iwlist', iface, 'scan'],
capture_output=True, text=True, timeout=30
)
if 'Operation not permitted' in result.stderr:
logger.warning("iwlist scan requires root privileges")
else:
current_network = {}
for line in result.stdout.split('\n'):
line = line.strip()
if 'Cell' in line and 'Address:' in line:
if current_network.get('bssid'):
networks.append(current_network)
bssid = line.split('Address:')[1].strip()
current_network = {'bssid': bssid.upper(), 'essid': '[Hidden]'}
elif 'ESSID:' in line:
essid = line.split('ESSID:')[1].strip().strip('"')
current_network['essid'] = essid or '[Hidden]'
elif 'Channel:' in line:
channel = line.split('Channel:')[1].strip()
current_network['channel'] = channel
elif 'Signal level=' in line:
match = re.search(r'Signal level[=:]?\s*(-?\d+)', line)
if match:
current_network['power'] = match.group(1)
elif 'Encryption key:' in line:
encrypted = 'on' in line.lower()
current_network['encrypted'] = encrypted
elif 'WPA' in line or 'WPA2' in line:
current_network['privacy'] = 'WPA2' if 'WPA2' in line else 'WPA'
if current_network.get('bssid'):
networks.append(current_network)
logger.info(f"iwlist scan found {len(networks)} networks")
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"iwlist scan failed: {e}")
logger.info(f"WiFi scan found {len(networks)} networks")
return networks
if not networks:
logger.warning("WiFi scanning requires root privileges. Run with sudo for WiFi scanning.")
return networks
except ImportError as e:
logger.error(f"Failed to import wifi scanner: {e}")
return []
except Exception as e:
logger.exception(f"WiFi scan failed: {e}")
return []
def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:

516
routes/wifi_v2.py Normal file
View File

@@ -0,0 +1,516 @@
"""
WiFi v2 API routes.
New unified WiFi scanning API with Quick Scan and Deep Scan modes,
channel analysis, hidden SSID correlation, and SSE streaming.
"""
from __future__ import annotations
import csv
import io
import json
import logging
from datetime import datetime
from typing import Generator
from flask import Blueprint, jsonify, request, Response
from utils.wifi import (
get_wifi_scanner,
analyze_channels,
get_hidden_correlator,
SCAN_MODE_QUICK,
SCAN_MODE_DEEP,
)
from utils.sse import format_sse
logger = logging.getLogger(__name__)
wifi_v2_bp = Blueprint('wifi_v2', __name__, url_prefix='/wifi/v2')
# =============================================================================
# Capabilities
# =============================================================================
@wifi_v2_bp.route('/capabilities', methods=['GET'])
def get_capabilities():
"""
Get WiFi scanning capabilities.
Returns available tools, interfaces, and scan mode support.
"""
scanner = get_wifi_scanner()
caps = scanner.check_capabilities()
return jsonify(caps.to_dict())
# =============================================================================
# Quick Scan
# =============================================================================
@wifi_v2_bp.route('/scan/quick', methods=['POST'])
def quick_scan():
"""
Perform a quick one-shot WiFi scan.
Uses system tools (nmcli, iw, iwlist, airport) without monitor mode.
Request body:
interface: Optional interface name
timeout: Optional scan timeout in seconds (default 15)
Returns:
WiFiScanResult with discovered networks and channel analysis.
"""
data = request.get_json() or {}
interface = data.get('interface')
timeout = float(data.get('timeout', 15))
scanner = get_wifi_scanner()
result = scanner.quick_scan(interface=interface, timeout=timeout)
return jsonify(result.to_dict())
# =============================================================================
# Deep Scan (Monitor Mode)
# =============================================================================
@wifi_v2_bp.route('/scan/start', methods=['POST'])
def start_deep_scan():
"""
Start a deep scan using airodump-ng.
Requires monitor mode interface and root privileges.
Request body:
interface: Monitor mode interface (e.g., 'wlan0mon')
band: Band to scan ('2.4', '5', 'all')
channel: Optional specific channel to monitor
"""
data = request.get_json() or {}
interface = data.get('interface')
band = data.get('band', 'all')
channel = data.get('channel')
if channel:
try:
channel = int(channel)
except ValueError:
return jsonify({'error': 'Invalid channel'}), 400
scanner = get_wifi_scanner()
success = scanner.start_deep_scan(
interface=interface,
band=band,
channel=channel,
)
if success:
return jsonify({
'status': 'started',
'mode': SCAN_MODE_DEEP,
'interface': interface or scanner._capabilities.monitor_interface,
})
else:
return jsonify({
'status': 'error',
'error': scanner._status.error,
}), 400
@wifi_v2_bp.route('/scan/stop', methods=['POST'])
def stop_deep_scan():
"""Stop the deep scan."""
scanner = get_wifi_scanner()
scanner.stop_deep_scan()
return jsonify({
'status': 'stopped',
})
@wifi_v2_bp.route('/scan/status', methods=['GET'])
def get_scan_status():
"""Get current scan status."""
scanner = get_wifi_scanner()
status = scanner.get_status()
return jsonify(status.to_dict())
# =============================================================================
# Data Endpoints
# =============================================================================
@wifi_v2_bp.route('/networks', methods=['GET'])
def get_networks():
"""
Get all discovered networks.
Query params:
band: Filter by band ('2.4GHz', '5GHz', '6GHz')
security: Filter by security type ('Open', 'WEP', 'WPA', 'WPA2', 'WPA3')
hidden: Filter hidden networks only (true/false)
min_rssi: Minimum RSSI threshold
sort: Sort field ('rssi', 'channel', 'essid', 'last_seen')
order: Sort order ('asc', 'desc')
format: Response format ('full', 'summary')
"""
scanner = get_wifi_scanner()
networks = scanner.access_points
# Apply filters
band = request.args.get('band')
if band:
networks = [n for n in networks if n.band == band]
security = request.args.get('security')
if security:
networks = [n for n in networks if n.security == security]
hidden = request.args.get('hidden')
if hidden == 'true':
networks = [n for n in networks if n.is_hidden]
elif hidden == 'false':
networks = [n for n in networks if not n.is_hidden]
min_rssi = request.args.get('min_rssi')
if min_rssi:
try:
min_rssi = int(min_rssi)
networks = [n for n in networks if n.rssi_current and n.rssi_current >= min_rssi]
except ValueError:
pass
# Apply sorting
sort_field = request.args.get('sort', 'rssi')
order = request.args.get('order', 'desc')
reverse = order == 'desc'
sort_key_map = {
'rssi': lambda n: n.rssi_current or -100,
'channel': lambda n: n.channel or 0,
'essid': lambda n: (n.essid or '').lower(),
'last_seen': lambda n: n.last_seen,
'clients': lambda n: n.client_count,
}
if sort_field in sort_key_map:
networks.sort(key=sort_key_map[sort_field], reverse=reverse)
# Format output
output_format = request.args.get('format', 'summary')
if output_format == 'full':
return jsonify([n.to_dict() for n in networks])
else:
return jsonify([n.to_summary_dict() for n in networks])
@wifi_v2_bp.route('/networks/<bssid>', methods=['GET'])
def get_network(bssid):
"""Get a specific network by BSSID."""
scanner = get_wifi_scanner()
network = scanner.get_network(bssid)
if network:
return jsonify(network.to_dict())
else:
return jsonify({'error': 'Network not found'}), 404
@wifi_v2_bp.route('/clients', methods=['GET'])
def get_clients():
"""
Get all discovered clients.
Query params:
associated: Filter by association status (true/false)
bssid: Filter by associated BSSID
min_rssi: Minimum RSSI threshold
"""
scanner = get_wifi_scanner()
clients = scanner.clients
# Apply filters
associated = request.args.get('associated')
if associated == 'true':
clients = [c for c in clients if c.is_associated]
elif associated == 'false':
clients = [c for c in clients if not c.is_associated]
bssid = request.args.get('bssid')
if bssid:
clients = [c for c in clients if c.associated_bssid == bssid.upper()]
min_rssi = request.args.get('min_rssi')
if min_rssi:
try:
min_rssi = int(min_rssi)
clients = [c for c in clients if c.rssi_current and c.rssi_current >= min_rssi]
except ValueError:
pass
return jsonify([c.to_dict() for c in clients])
@wifi_v2_bp.route('/clients/<mac>', methods=['GET'])
def get_client(mac):
"""Get a specific client by MAC address."""
scanner = get_wifi_scanner()
client = scanner.get_client(mac)
if client:
return jsonify(client.to_dict())
else:
return jsonify({'error': 'Client not found'}), 404
@wifi_v2_bp.route('/probes', methods=['GET'])
def get_probes():
"""
Get captured probe requests.
Query params:
client_mac: Filter by client MAC
ssid: Filter by probed SSID
limit: Maximum number of results
"""
scanner = get_wifi_scanner()
probes = scanner.probe_requests
# Apply filters
client_mac = request.args.get('client_mac')
if client_mac:
probes = [p for p in probes if p.client_mac == client_mac.upper()]
ssid = request.args.get('ssid')
if ssid:
probes = [p for p in probes if p.probed_ssid == ssid]
# Apply limit
limit = request.args.get('limit')
if limit:
try:
limit = int(limit)
probes = probes[-limit:] # Most recent
except ValueError:
pass
return jsonify([p.to_dict() for p in probes])
# =============================================================================
# Channel Analysis
# =============================================================================
@wifi_v2_bp.route('/channels', methods=['GET'])
def get_channel_stats():
"""
Get channel utilization statistics and recommendations.
Query params:
include_dfs: Include DFS channels in recommendations (true/false)
"""
scanner = get_wifi_scanner()
include_dfs = request.args.get('include_dfs', 'false') == 'true'
stats, recommendations = analyze_channels(
scanner.access_points,
include_dfs=include_dfs,
)
return jsonify({
'stats': [s.to_dict() for s in stats],
'recommendations': [r.to_dict() for r in recommendations],
})
# =============================================================================
# Hidden SSID Correlation
# =============================================================================
@wifi_v2_bp.route('/hidden', methods=['GET'])
def get_hidden_correlations():
"""
Get revealed hidden SSIDs from correlation.
Returns mapping of BSSID -> revealed SSID.
"""
correlator = get_hidden_correlator()
return jsonify(correlator.get_all_revealed())
# =============================================================================
# Baseline Management
# =============================================================================
@wifi_v2_bp.route('/baseline/set', methods=['POST'])
def set_baseline():
"""Mark current networks as baseline (known networks)."""
scanner = get_wifi_scanner()
scanner.set_baseline()
return jsonify({
'status': 'baseline_set',
'network_count': len(scanner._baseline_networks),
'set_at': datetime.now().isoformat(),
})
@wifi_v2_bp.route('/baseline/clear', methods=['POST'])
def clear_baseline():
"""Clear the baseline."""
scanner = get_wifi_scanner()
scanner.clear_baseline()
return jsonify({
'status': 'baseline_cleared',
})
# =============================================================================
# SSE Streaming
# =============================================================================
@wifi_v2_bp.route('/stream', methods=['GET'])
def event_stream():
"""
Server-Sent Events stream for real-time updates.
Events:
- network_update: Network discovered/updated
- client_update: Client discovered/updated
- probe_request: Probe request detected
- hidden_revealed: Hidden SSID revealed
- scan_started, scan_stopped, scan_error
- keepalive: Periodic keepalive
"""
def generate() -> Generator[str, None, None]:
scanner = get_wifi_scanner()
for event in scanner.get_event_stream():
yield format_sse(event)
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
# =============================================================================
# Data Management
# =============================================================================
@wifi_v2_bp.route('/clear', methods=['POST'])
def clear_data():
"""Clear all discovered data."""
scanner = get_wifi_scanner()
scanner.clear_data()
return jsonify({
'status': 'cleared',
})
# =============================================================================
# Export
# =============================================================================
@wifi_v2_bp.route('/export', methods=['GET'])
def export_data():
"""
Export scan data.
Query params:
format: 'json' or 'csv' (default: json)
type: 'networks', 'clients', 'probes', 'all' (default: all)
"""
scanner = get_wifi_scanner()
export_format = request.args.get('format', 'json')
export_type = request.args.get('type', 'all')
if export_format == 'csv':
return _export_csv(scanner, export_type)
else:
return _export_json(scanner, export_type)
def _export_json(scanner, export_type: str) -> Response:
"""Export data as JSON."""
data = {}
if export_type in ('networks', 'all'):
data['networks'] = [n.to_dict() for n in scanner.access_points]
if export_type in ('clients', 'all'):
data['clients'] = [c.to_dict() for c in scanner.clients]
if export_type in ('probes', 'all'):
data['probes'] = [p.to_dict() for p in scanner.probe_requests]
data['exported_at'] = datetime.now().isoformat()
data['network_count'] = len(scanner.access_points)
data['client_count'] = len(scanner.clients)
response = Response(
json.dumps(data, indent=2),
mimetype='application/json',
)
response.headers['Content-Disposition'] = f'attachment; filename=wifi_scan_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
return response
def _export_csv(scanner, export_type: str) -> Response:
"""Export data as CSV."""
output = io.StringIO()
if export_type in ('networks', 'all'):
writer = csv.writer(output)
writer.writerow([
'BSSID', 'ESSID', 'Channel', 'Band', 'RSSI', 'Security',
'Cipher', 'Auth', 'Vendor', 'Clients', 'First Seen', 'Last Seen'
])
for n in scanner.access_points:
writer.writerow([
n.bssid,
n.essid or '[Hidden]',
n.channel,
n.band,
n.rssi_current,
n.security,
n.cipher,
n.auth,
n.vendor or '',
n.client_count,
n.first_seen.isoformat(),
n.last_seen.isoformat(),
])
if export_type == 'all':
writer.writerow([]) # Blank line separator
if export_type in ('clients', 'all'):
writer = csv.writer(output)
if export_type == 'clients':
writer.writerow([
'MAC', 'Vendor', 'RSSI', 'Associated BSSID', 'Probed SSIDs',
'First Seen', 'Last Seen'
])
for c in scanner.clients:
writer.writerow([
c.mac,
c.vendor or '',
c.rssi_current,
c.associated_bssid or '',
', '.join(c.probed_ssids),
c.first_seen.isoformat(),
c.last_seen.isoformat(),
])
response = Response(output.getvalue(), mimetype='text/csv')
response.headers['Content-Disposition'] = f'attachment; filename=wifi_scan_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
return response

View File

@@ -0,0 +1,286 @@
/**
* WiFi Channel Utilization Chart Component
*
* Displays channel utilization as a bar chart with recommendations.
* Shows AP count, client count, and utilization score per channel.
*/
const ChannelChart = (function() {
'use strict';
// ==========================================================================
// Configuration
// ==========================================================================
const CONFIG = {
height: 150,
barWidth: 20,
barSpacing: 4,
padding: { top: 20, right: 20, bottom: 30, left: 40 },
colors: {
low: '#22c55e', // Green - low utilization
medium: '#eab308', // Yellow - medium
high: '#ef4444', // Red - high
recommended: '#3b82f6', // Blue - recommended
},
thresholds: {
low: 0.3,
medium: 0.6,
},
};
// 2.4 GHz non-overlapping channels
const CHANNELS_2_4 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
const NON_OVERLAPPING_2_4 = [1, 6, 11];
// 5 GHz channels (non-DFS)
const CHANNELS_5 = [36, 40, 44, 48, 149, 153, 157, 161, 165];
// ==========================================================================
// State
// ==========================================================================
let container = null;
let currentBand = '2.4';
let channelStats = [];
let recommendations = [];
// ==========================================================================
// Initialization
// ==========================================================================
function init(containerId, options = {}) {
container = document.getElementById(containerId);
if (!container) {
console.warn('[ChannelChart] Container not found:', containerId);
return;
}
Object.assign(CONFIG, options);
render();
}
// ==========================================================================
// Update
// ==========================================================================
function update(stats, recs) {
channelStats = stats || [];
recommendations = recs || [];
render();
}
function setBand(band) {
currentBand = band;
render();
}
// ==========================================================================
// Rendering
// ==========================================================================
function render() {
if (!container) return;
const channels = currentBand === '2.4' ? CHANNELS_2_4 : CHANNELS_5;
const nonOverlapping = currentBand === '2.4' ? NON_OVERLAPPING_2_4 : CHANNELS_5;
// Build stats map
const statsMap = {};
channelStats.forEach(s => {
statsMap[s.channel] = s;
});
// Build recommendations map
const recsMap = {};
recommendations.forEach((r, i) => {
recsMap[r.channel] = { rank: i + 1, ...r };
});
// Calculate dimensions
const width = channels.length * (CONFIG.barWidth + CONFIG.barSpacing) + CONFIG.padding.left + CONFIG.padding.right;
const height = CONFIG.height + CONFIG.padding.top + CONFIG.padding.bottom;
const chartHeight = CONFIG.height;
// Find max values for scaling
let maxApCount = 1;
channelStats.forEach(s => {
if (s.ap_count > maxApCount) maxApCount = s.ap_count;
});
// Build SVG
let svg = `
<svg width="${width}" height="${height}" class="channel-chart-svg">
<defs>
<linearGradient id="utilGradientLow" x1="0%" y1="0%" x2="0%" y2="100%">
<stop offset="0%" style="stop-color:${CONFIG.colors.low};stop-opacity:0.9" />
<stop offset="100%" style="stop-color:${CONFIG.colors.low};stop-opacity:0.5" />
</linearGradient>
<linearGradient id="utilGradientMed" x1="0%" y1="0%" x2="0%" y2="100%">
<stop offset="0%" style="stop-color:${CONFIG.colors.medium};stop-opacity:0.9" />
<stop offset="100%" style="stop-color:${CONFIG.colors.medium};stop-opacity:0.5" />
</linearGradient>
<linearGradient id="utilGradientHigh" x1="0%" y1="0%" x2="0%" y2="100%">
<stop offset="0%" style="stop-color:${CONFIG.colors.high};stop-opacity:0.9" />
<stop offset="100%" style="stop-color:${CONFIG.colors.high};stop-opacity:0.5" />
</linearGradient>
</defs>
<!-- Y-axis label -->
<text x="10" y="${height / 2}" fill="#666" font-size="10" transform="rotate(-90, 10, ${height / 2})" text-anchor="middle">APs</text>
<!-- Y-axis ticks -->
${renderYAxis(chartHeight, maxApCount)}
<!-- Bars -->
<g transform="translate(${CONFIG.padding.left}, ${CONFIG.padding.top})">
${channels.map((ch, i) => {
const stats = statsMap[ch] || { ap_count: 0, utilization_score: 0 };
const rec = recsMap[ch];
const isNonOverlapping = nonOverlapping.includes(ch);
return renderBar(i, ch, stats, rec, isNonOverlapping, chartHeight, maxApCount);
}).join('')}
</g>
<!-- X-axis labels -->
<g transform="translate(${CONFIG.padding.left}, ${CONFIG.padding.top + chartHeight + 5})">
${channels.map((ch, i) => {
const x = i * (CONFIG.barWidth + CONFIG.barSpacing) + CONFIG.barWidth / 2;
const isNonOverlapping = nonOverlapping.includes(ch);
return `<text x="${x}" y="12" fill="${isNonOverlapping ? '#fff' : '#666'}" font-size="9" text-anchor="middle">${ch}</text>`;
}).join('')}
</g>
</svg>
`;
// Add legend
svg += renderLegend();
// Add recommendations
if (recommendations.length > 0) {
svg += renderRecommendations();
}
container.innerHTML = svg;
}
function renderYAxis(chartHeight, maxApCount) {
const ticks = [];
const tickCount = Math.min(5, maxApCount);
const step = Math.ceil(maxApCount / tickCount);
for (let i = 0; i <= maxApCount; i += step) {
const y = CONFIG.padding.top + chartHeight - (i / maxApCount * chartHeight);
ticks.push(`
<line x1="${CONFIG.padding.left - 5}" y1="${y}" x2="${CONFIG.padding.left}" y2="${y}" stroke="#444" />
<text x="${CONFIG.padding.left - 8}" y="${y + 3}" fill="#666" font-size="9" text-anchor="end">${i}</text>
`);
}
return ticks.join('');
}
function renderBar(index, channel, stats, rec, isNonOverlapping, chartHeight, maxApCount) {
const x = index * (CONFIG.barWidth + CONFIG.barSpacing);
const barHeight = (stats.ap_count / maxApCount) * chartHeight;
const y = chartHeight - barHeight;
// Determine color based on utilization
let gradient = 'utilGradientLow';
if (stats.utilization_score >= CONFIG.thresholds.medium) {
gradient = 'utilGradientHigh';
} else if (stats.utilization_score >= CONFIG.thresholds.low) {
gradient = 'utilGradientMed';
}
// Recommended channel indicator
const isRecommended = rec && rec.rank <= 3;
const recIndicator = isRecommended ?
`<circle cx="${x + CONFIG.barWidth / 2}" cy="${chartHeight + 20}" r="4" fill="${CONFIG.colors.recommended}" />
<text x="${x + CONFIG.barWidth / 2}" y="${chartHeight + 23}" fill="#fff" font-size="7" text-anchor="middle">${rec.rank}</text>` : '';
// Non-overlapping channel marker
const channelMarker = isNonOverlapping ?
`<rect x="${x}" y="${chartHeight}" width="${CONFIG.barWidth}" height="2" fill="#3b82f6" />` : '';
return `
<g class="channel-bar" data-channel="${channel}">
<!-- Bar background -->
<rect x="${x}" y="0" width="${CONFIG.barWidth}" height="${chartHeight}"
fill="#1a1a2e" rx="2" />
<!-- Utilization bar -->
<rect x="${x}" y="${y}" width="${CONFIG.barWidth}" height="${barHeight}"
fill="url(#${gradient})" rx="2" />
<!-- AP count label -->
${stats.ap_count > 0 ? `
<text x="${x + CONFIG.barWidth / 2}" y="${y - 4}" fill="#fff" font-size="9" text-anchor="middle">
${stats.ap_count}
</text>
` : ''}
${channelMarker}
${recIndicator}
<!-- Hover area -->
<rect x="${x}" y="0" width="${CONFIG.barWidth}" height="${chartHeight}"
fill="transparent" class="channel-hover" />
</g>
`;
}
function renderLegend() {
return `
<div class="channel-chart-legend" style="display: flex; gap: 16px; justify-content: center; margin-top: 8px; font-size: 10px;">
<div style="display: flex; align-items: center; gap: 4px;">
<span style="width: 12px; height: 12px; background: ${CONFIG.colors.low}; border-radius: 2px;"></span>
<span style="color: #888;">Low</span>
</div>
<div style="display: flex; align-items: center; gap: 4px;">
<span style="width: 12px; height: 12px; background: ${CONFIG.colors.medium}; border-radius: 2px;"></span>
<span style="color: #888;">Medium</span>
</div>
<div style="display: flex; align-items: center; gap: 4px;">
<span style="width: 12px; height: 12px; background: ${CONFIG.colors.high}; border-radius: 2px;"></span>
<span style="color: #888;">High</span>
</div>
<div style="display: flex; align-items: center; gap: 4px;">
<span style="width: 12px; height: 3px; background: #3b82f6; border-radius: 1px;"></span>
<span style="color: #888;">Non-overlapping</span>
</div>
</div>
`;
}
function renderRecommendations() {
const topRecs = recommendations.slice(0, 3);
if (topRecs.length === 0) return '';
return `
<div class="channel-chart-recommendations" style="margin-top: 12px; padding: 8px; background: #1a1a2e; border-radius: 4px;">
<div style="font-size: 10px; color: #888; margin-bottom: 6px;">Recommended Channels:</div>
<div style="display: flex; gap: 8px; flex-wrap: wrap;">
${topRecs.map((rec, i) => `
<div style="display: flex; align-items: center; gap: 4px; padding: 4px 8px; background: ${i === 0 ? 'rgba(59, 130, 246, 0.2)' : '#0d0d1a'}; border-radius: 4px; border: 1px solid ${i === 0 ? '#3b82f6' : '#333'};">
<span style="font-size: 11px; font-weight: bold; color: ${i === 0 ? '#3b82f6' : '#666'};">#${i + 1}</span>
<span style="font-size: 12px; color: #fff;">Ch ${rec.channel}</span>
<span style="font-size: 9px; color: #666;">(${rec.band})</span>
${rec.is_dfs ? '<span style="font-size: 8px; color: #ff6b6b; margin-left: 4px;">DFS</span>' : ''}
</div>
`).join('')}
</div>
</div>
`;
}
// ==========================================================================
// Public API
// ==========================================================================
return {
init,
update,
setBand,
};
})();

1005
static/js/modes/wifi.js Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1565,6 +1565,9 @@
<script src="{{ url_for('static', filename='js/components/proximity-radar.js') }}"></script>
<script src="{{ url_for('static', filename='js/components/timeline-heatmap.js') }}"></script>
<script src="{{ url_for('static', filename='js/modes/bluetooth.js') }}"></script>
<!-- WiFi v2 components -->
<script src="{{ url_for('static', filename='js/components/channel-chart.js') }}"></script>
<script src="{{ url_for('static', filename='js/modes/wifi.js') }}"></script>
<script src="{{ url_for('static', filename='js/modes/listening-post.js') }}"></script>
<script>

View File

@@ -1,5 +1,18 @@
<!-- WiFi MODE -->
<div id="wifiMode" class="mode-content">
<!-- Scan Mode Tabs -->
<div class="section" style="padding: 8px;">
<div class="wifi-scan-mode-tabs" style="display: flex; gap: 4px;">
<button id="wifiScanModeQuick" class="wifi-mode-tab active" style="flex: 1; padding: 8px; font-size: 11px; background: var(--accent-green); color: #000; border: none; border-radius: 4px; cursor: pointer;">
Quick Scan
</button>
<button id="wifiScanModeDeep" class="wifi-mode-tab" style="flex: 1; padding: 8px; font-size: 11px; background: var(--bg-tertiary); color: #888; border: 1px solid var(--border-color); border-radius: 4px; cursor: pointer;">
Deep Scan
</button>
</div>
<div id="wifiCapabilityStatus" class="info-text" style="margin-top: 8px; font-size: 10px;"></div>
</div>
<div class="section">
<h3>WiFi Adapter</h3>
<div class="form-group">
@@ -133,10 +146,37 @@
</div>
</div>
<button class="run-btn" id="startWifiBtn" onclick="startWifiScan()">
Start Scanning
</button>
<button class="stop-btn" id="stopWifiBtn" onclick="stopWifiScan()" style="display: none;">
<!-- v2 Scan Buttons -->
<div style="display: flex; gap: 8px; margin-bottom: 8px;">
<button class="run-btn" id="wifiQuickScanBtn" onclick="WiFiMode.startQuickScan()" style="flex: 1;">
Quick Scan
</button>
<button class="run-btn" id="wifiDeepScanBtn" onclick="WiFiMode.startDeepScan()" style="flex: 1; background: var(--accent-orange);">
Deep Scan
</button>
</div>
<button class="stop-btn" id="wifiStopScanBtn" onclick="WiFiMode.stopScan()" style="display: none; width: 100%;">
Stop Scanning
</button>
<!-- Legacy Scan Buttons (hidden, for backwards compatibility) -->
<button class="run-btn" id="startWifiBtn" onclick="startWifiScan()" style="display: none;">
Start Scanning (Legacy)
</button>
<button class="stop-btn" id="stopWifiBtn" onclick="stopWifiScan()" style="display: none;">
Stop Scanning (Legacy)
</button>
<!-- Export Section -->
<div class="section" style="margin-top: 10px;">
<h3>Export</h3>
<div style="display: flex; gap: 8px;">
<button class="preset-btn" onclick="WiFiMode.exportData('csv')" style="flex: 1;">
Export CSV
</button>
<button class="preset-btn" onclick="WiFiMode.exportData('json')" style="flex: 1;">
Export JSON
</button>
</div>
</div>
</div>

187
utils/wifi/__init__.py Normal file
View File

@@ -0,0 +1,187 @@
"""
WiFi scanning package for INTERCEPT.
Provides unified WiFi scanning with dual-mode architecture:
- Quick Scan: Uses system tools (nmcli, iw, iwlist, airport) without monitor mode
- Deep Scan: Uses airodump-ng with monitor mode for clients and probes
Also includes channel analysis, hidden SSID correlation, and network aggregation.
"""
from .models import (
WiFiObservation,
WiFiAccessPoint,
WiFiClient,
WiFiProbeRequest,
WiFiScanResult,
WiFiScanStatus,
WiFiCapabilities,
ChannelStats,
ChannelRecommendation,
)
from .scanner import (
UnifiedWiFiScanner,
get_wifi_scanner,
reset_wifi_scanner,
)
from .constants import (
# Bands
BAND_2_4_GHZ,
BAND_5_GHZ,
BAND_6_GHZ,
BAND_UNKNOWN,
# Channels
CHANNELS_2_4_GHZ,
CHANNELS_5_GHZ,
CHANNELS_6_GHZ,
NON_OVERLAPPING_2_4_GHZ,
NON_OVERLAPPING_5_GHZ,
# Security
SECURITY_OPEN,
SECURITY_WEP,
SECURITY_WPA,
SECURITY_WPA2,
SECURITY_WPA3,
SECURITY_WPA_WPA2,
SECURITY_WPA2_WPA3,
SECURITY_ENTERPRISE,
SECURITY_UNKNOWN,
# Cipher
CIPHER_NONE,
CIPHER_WEP,
CIPHER_TKIP,
CIPHER_CCMP,
CIPHER_GCMP,
CIPHER_UNKNOWN,
# Auth
AUTH_OPEN,
AUTH_PSK,
AUTH_SAE,
AUTH_EAP,
AUTH_OWE,
AUTH_UNKNOWN,
# Signal bands
SIGNAL_STRONG,
SIGNAL_MEDIUM,
SIGNAL_WEAK,
SIGNAL_VERY_WEAK,
SIGNAL_UNKNOWN,
# Proximity bands (consistent with Bluetooth)
PROXIMITY_IMMEDIATE,
PROXIMITY_NEAR,
PROXIMITY_FAR,
PROXIMITY_UNKNOWN,
# Scan modes
SCAN_MODE_QUICK,
SCAN_MODE_DEEP,
# Helper functions
get_band_from_channel,
get_band_from_frequency,
get_channel_from_frequency,
get_signal_band,
get_proximity_band,
get_vendor_from_mac,
)
from .channel_analyzer import (
ChannelAnalyzer,
analyze_channels,
)
from .hidden_ssid import (
HiddenSSIDCorrelator,
get_hidden_correlator,
)
__all__ = [
# Main scanner
'UnifiedWiFiScanner',
'get_wifi_scanner',
'reset_wifi_scanner',
# Models
'WiFiObservation',
'WiFiAccessPoint',
'WiFiClient',
'WiFiProbeRequest',
'WiFiScanResult',
'WiFiScanStatus',
'WiFiCapabilities',
'ChannelStats',
'ChannelRecommendation',
# Channel analysis
'ChannelAnalyzer',
'analyze_channels',
# Hidden SSID correlation
'HiddenSSIDCorrelator',
'get_hidden_correlator',
# Constants - Bands
'BAND_2_4_GHZ',
'BAND_5_GHZ',
'BAND_6_GHZ',
'BAND_UNKNOWN',
# Constants - Channels
'CHANNELS_2_4_GHZ',
'CHANNELS_5_GHZ',
'CHANNELS_6_GHZ',
'NON_OVERLAPPING_2_4_GHZ',
'NON_OVERLAPPING_5_GHZ',
# Constants - Security
'SECURITY_OPEN',
'SECURITY_WEP',
'SECURITY_WPA',
'SECURITY_WPA2',
'SECURITY_WPA3',
'SECURITY_WPA_WPA2',
'SECURITY_WPA2_WPA3',
'SECURITY_ENTERPRISE',
'SECURITY_UNKNOWN',
# Constants - Cipher
'CIPHER_NONE',
'CIPHER_WEP',
'CIPHER_TKIP',
'CIPHER_CCMP',
'CIPHER_GCMP',
'CIPHER_UNKNOWN',
# Constants - Auth
'AUTH_OPEN',
'AUTH_PSK',
'AUTH_SAE',
'AUTH_EAP',
'AUTH_OWE',
'AUTH_UNKNOWN',
# Constants - Signal bands
'SIGNAL_STRONG',
'SIGNAL_MEDIUM',
'SIGNAL_WEAK',
'SIGNAL_VERY_WEAK',
'SIGNAL_UNKNOWN',
# Constants - Proximity bands
'PROXIMITY_IMMEDIATE',
'PROXIMITY_NEAR',
'PROXIMITY_FAR',
'PROXIMITY_UNKNOWN',
# Constants - Scan modes
'SCAN_MODE_QUICK',
'SCAN_MODE_DEEP',
# Helper functions
'get_band_from_channel',
'get_band_from_frequency',
'get_channel_from_frequency',
'get_signal_band',
'get_proximity_band',
'get_vendor_from_mac',
]

View File

@@ -0,0 +1,295 @@
"""
WiFi channel utilization analysis and recommendations.
Analyzes channel congestion based on:
- Number of access points per channel
- Number of clients per channel
- Signal strength (stronger = more interference)
- Channel overlap effects
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from .constants import (
BAND_2_4_GHZ,
BAND_5_GHZ,
BAND_6_GHZ,
CHANNELS_2_4_GHZ,
CHANNELS_5_GHZ,
CHANNELS_6_GHZ,
NON_OVERLAPPING_2_4_GHZ,
NON_OVERLAPPING_5_GHZ,
CHANNEL_FREQUENCIES,
CHANNEL_WEIGHT_AP_COUNT,
CHANNEL_WEIGHT_CLIENT_COUNT,
CHANNEL_RSSI_INTERFERENCE_FACTOR,
get_band_from_channel,
)
from .models import WiFiAccessPoint, ChannelStats, ChannelRecommendation
logger = logging.getLogger(__name__)
# DFS channels (Dynamic Frequency Selection) - require radar detection
DFS_CHANNELS_5_GHZ = list(range(52, 65)) + list(range(100, 145))
@dataclass
class ChannelScore:
"""Internal scoring for a channel."""
channel: int
band: str
ap_count: int = 0
client_count: int = 0
rssi_sum: float = 0.0
rssi_count: int = 0
overlap_penalty: float = 0.0
class ChannelAnalyzer:
"""
Analyzes WiFi channel utilization and provides recommendations.
Uses a scoring algorithm that considers:
1. AP density (60% weight by default)
2. Client density (40% weight by default)
3. Signal strength adjustment (stronger signals = more interference)
4. Channel overlap effects for 2.4 GHz
"""
def __init__(
self,
ap_weight: float = CHANNEL_WEIGHT_AP_COUNT,
client_weight: float = CHANNEL_WEIGHT_CLIENT_COUNT,
rssi_factor: float = CHANNEL_RSSI_INTERFERENCE_FACTOR,
):
"""
Initialize channel analyzer.
Args:
ap_weight: Weight for AP count in utilization score (0-1).
client_weight: Weight for client count in utilization score (0-1).
rssi_factor: Factor for RSSI-based interference adjustment.
"""
self.ap_weight = ap_weight
self.client_weight = client_weight
self.rssi_factor = rssi_factor
def analyze(
self,
access_points: list[WiFiAccessPoint],
include_dfs: bool = False,
) -> tuple[list[ChannelStats], list[ChannelRecommendation]]:
"""
Analyze channel utilization from access point data.
Args:
access_points: List of discovered access points.
include_dfs: Whether to include DFS channels in recommendations.
Returns:
Tuple of (channel_stats, recommendations).
"""
# Build per-channel scores
scores: dict[int, ChannelScore] = {}
for ap in access_points:
if ap.channel is None:
continue
channel = ap.channel
if channel not in scores:
scores[channel] = ChannelScore(
channel=channel,
band=get_band_from_channel(channel),
)
score = scores[channel]
score.ap_count += 1
score.client_count += ap.client_count
if ap.rssi_current is not None:
score.rssi_sum += ap.rssi_current
score.rssi_count += 1
# Calculate overlap penalties for 2.4 GHz
self._calculate_overlap_penalties(scores)
# Convert to ChannelStats
channel_stats = self._build_channel_stats(scores)
# Generate recommendations
recommendations = self._generate_recommendations(
scores, access_points, include_dfs
)
return channel_stats, recommendations
def _calculate_overlap_penalties(self, scores: dict[int, ChannelScore]):
"""Calculate overlap penalties for 2.4 GHz channels."""
# In 2.4 GHz, channels overlap: each channel is 22 MHz wide
# but only 5 MHz apart. Channels 1, 6, 11 don't overlap.
#
# Channel overlap:
# - Adjacent channel (+/- 1): 75% overlap
# - 2 channels apart: 50% overlap
# - 3 channels apart: 25% overlap
# - 4 channels apart: ~12% overlap
# - 5+ channels apart: no overlap
overlap_factors = {1: 0.75, 2: 0.50, 3: 0.25, 4: 0.12}
for channel, score in scores.items():
if score.band != BAND_2_4_GHZ:
continue
penalty = 0.0
for other_channel, other_score in scores.items():
if other_channel == channel or other_score.band != BAND_2_4_GHZ:
continue
distance = abs(channel - other_channel)
if distance in overlap_factors:
# Penalty based on APs on overlapping channel
overlap = overlap_factors[distance]
penalty += other_score.ap_count * overlap * 0.5
score.overlap_penalty = penalty
def _build_channel_stats(self, scores: dict[int, ChannelScore]) -> list[ChannelStats]:
"""Build ChannelStats from scores."""
stats_list = []
for channel, score in sorted(scores.items()):
rssi_avg = None
if score.rssi_count > 0:
rssi_avg = score.rssi_sum / score.rssi_count
# Calculate utilization score
utilization = self._calculate_utilization(score)
stats = ChannelStats(
channel=channel,
band=score.band,
frequency_mhz=CHANNEL_FREQUENCIES.get(channel),
ap_count=score.ap_count,
client_count=score.client_count,
rssi_avg=rssi_avg,
utilization_score=utilization,
)
stats_list.append(stats)
return stats_list
def _calculate_utilization(self, score: ChannelScore) -> float:
"""Calculate channel utilization score (0.0-1.0, lower is better)."""
# Base score from AP and client counts
ap_score = score.ap_count * self.ap_weight
client_score = score.client_count * self.client_weight
# RSSI adjustment: stronger signals = more interference
rssi_adjustment = 0.0
if score.rssi_count > 0:
avg_rssi = score.rssi_sum / score.rssi_count
# Normalize: -30 dBm (very strong) -> 1.0, -100 dBm (weak) -> 0.0
rssi_normalized = (avg_rssi + 100) / 70
rssi_adjustment = max(0, rssi_normalized) * self.rssi_factor * score.ap_count
# Overlap penalty (already scaled)
overlap_score = score.overlap_penalty
# Total score
total = ap_score + client_score + rssi_adjustment + overlap_score
# Normalize to 0.0-1.0 range (cap at reasonable maximum)
normalized = min(1.0, total / 10.0)
return normalized
def _generate_recommendations(
self,
scores: dict[int, ChannelScore],
access_points: list[WiFiAccessPoint],
include_dfs: bool,
) -> list[ChannelRecommendation]:
"""Generate channel recommendations."""
recommendations = []
# Score all non-overlapping channels
candidate_channels = []
# 2.4 GHz non-overlapping
for channel in NON_OVERLAPPING_2_4_GHZ:
candidate_channels.append((channel, BAND_2_4_GHZ, False))
# 5 GHz non-DFS
for channel in NON_OVERLAPPING_5_GHZ:
is_dfs = channel in DFS_CHANNELS_5_GHZ
if is_dfs and not include_dfs:
continue
candidate_channels.append((channel, BAND_5_GHZ, is_dfs))
# 5 GHz DFS channels (if requested)
if include_dfs:
for channel in DFS_CHANNELS_5_GHZ:
if channel not in NON_OVERLAPPING_5_GHZ:
candidate_channels.append((channel, BAND_5_GHZ, True))
# Score each candidate
for channel, band, is_dfs in candidate_channels:
score = scores.get(channel)
if score:
utilization = self._calculate_utilization(score)
ap_count = score.ap_count
else:
utilization = 0.0
ap_count = 0
# Build reason string
if ap_count == 0:
reason = "No APs detected - clear channel"
elif ap_count == 1:
reason = f"1 AP on channel"
else:
reason = f"{ap_count} APs on channel"
if is_dfs:
reason += " (DFS - radar detection required)"
recommendations.append(ChannelRecommendation(
channel=channel,
band=band,
score=utilization,
reason=reason,
is_dfs=is_dfs,
))
# Sort by score (lower is better), then prefer non-DFS
recommendations.sort(key=lambda r: (r.score, r.is_dfs, r.channel))
return recommendations
# Module-level convenience function
def analyze_channels(
access_points: list[WiFiAccessPoint],
include_dfs: bool = False,
) -> tuple[list[ChannelStats], list[ChannelRecommendation]]:
"""
Analyze channel utilization and get recommendations.
Args:
access_points: List of discovered access points.
include_dfs: Whether to include DFS channels.
Returns:
Tuple of (channel_stats, recommendations).
"""
analyzer = ChannelAnalyzer()
return analyzer.analyze(access_points, include_dfs)

446
utils/wifi/constants.py Normal file
View File

@@ -0,0 +1,446 @@
"""
WiFi-specific constants for the unified scanner.
"""
from __future__ import annotations
# =============================================================================
# SCANNER SETTINGS
# =============================================================================
# Default quick scan timeout in seconds
DEFAULT_QUICK_SCAN_TIMEOUT = 15
# Default deep scan channel dwell time (seconds)
DEFAULT_CHANNEL_DWELL_TIME = 2
# Maximum RSSI samples to keep per network
MAX_RSSI_SAMPLES = 300
# Network expiration time (seconds since last seen)
NETWORK_STALE_TIMEOUT = 300 # 5 minutes
# Client expiration time (seconds since last seen)
CLIENT_STALE_TIMEOUT = 180 # 3 minutes
# Probe request retention time (seconds)
PROBE_REQUEST_RETENTION = 600 # 10 minutes
# =============================================================================
# WIFI BANDS
# =============================================================================
BAND_2_4_GHZ = '2.4GHz'
BAND_5_GHZ = '5GHz'
BAND_6_GHZ = '6GHz'
BAND_UNKNOWN = 'unknown'
# =============================================================================
# WIFI BAND CHANNEL MAPPINGS
# =============================================================================
# 2.4 GHz channels (1-14)
CHANNELS_2_4_GHZ = list(range(1, 15))
# 5 GHz channels (UNII-1, UNII-2A, UNII-2C, UNII-3)
CHANNELS_5_GHZ = [
# UNII-1 (5150-5250 MHz)
36, 40, 44, 48,
# UNII-2A (5250-5350 MHz) - DFS
52, 56, 60, 64,
# UNII-2C (5470-5725 MHz) - DFS
100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140, 144,
# UNII-3 (5725-5850 MHz)
149, 153, 157, 161, 165,
]
# 6 GHz channels (Wi-Fi 6E)
CHANNELS_6_GHZ = [
1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57, 61, 65, 69, 73,
77, 81, 85, 89, 93, 97, 101, 105, 109, 113, 117, 121, 125, 129, 133, 137,
141, 145, 149, 153, 157, 161, 165, 169, 173, 177, 181, 185, 189, 193, 197,
201, 205, 209, 213, 217, 221, 225, 229, 233
]
# Non-overlapping channels for recommendations
NON_OVERLAPPING_2_4_GHZ = [1, 6, 11]
NON_OVERLAPPING_5_GHZ = [36, 40, 44, 48, 149, 153, 157, 161, 165] # Non-DFS
# Channel to frequency mappings (MHz)
CHANNEL_FREQUENCIES = {
# 2.4 GHz
1: 2412, 2: 2417, 3: 2422, 4: 2427, 5: 2432, 6: 2437, 7: 2442,
8: 2447, 9: 2452, 10: 2457, 11: 2462, 12: 2467, 13: 2472, 14: 2484,
# 5 GHz
36: 5180, 40: 5200, 44: 5220, 48: 5240,
52: 5260, 56: 5280, 60: 5300, 64: 5320,
100: 5500, 104: 5520, 108: 5540, 112: 5560, 116: 5580,
120: 5600, 124: 5620, 128: 5640, 132: 5660, 136: 5680, 140: 5700, 144: 5720,
149: 5745, 153: 5765, 157: 5785, 161: 5805, 165: 5825,
}
# Frequency to channel reverse mapping
FREQUENCY_CHANNELS = {v: k for k, v in CHANNEL_FREQUENCIES.items()}
def get_band_from_channel(channel: int) -> str:
"""Get WiFi band from channel number."""
if 1 <= channel <= 14:
return BAND_2_4_GHZ
elif channel in CHANNELS_5_GHZ:
return BAND_5_GHZ
elif channel in CHANNELS_6_GHZ:
return BAND_6_GHZ
return BAND_UNKNOWN
def get_band_from_frequency(frequency_mhz: int) -> str:
"""Get WiFi band from frequency in MHz."""
if 2400 <= frequency_mhz <= 2500:
return BAND_2_4_GHZ
elif 5150 <= frequency_mhz <= 5850:
return BAND_5_GHZ
elif 5925 <= frequency_mhz <= 7125:
return BAND_6_GHZ
return BAND_UNKNOWN
def get_channel_from_frequency(frequency_mhz: int) -> int | None:
"""Get channel number from frequency in MHz."""
return FREQUENCY_CHANNELS.get(frequency_mhz)
# =============================================================================
# SECURITY TYPES
# =============================================================================
SECURITY_OPEN = 'Open'
SECURITY_WEP = 'WEP'
SECURITY_WPA = 'WPA'
SECURITY_WPA2 = 'WPA2'
SECURITY_WPA3 = 'WPA3'
SECURITY_WPA_WPA2 = 'WPA/WPA2'
SECURITY_WPA2_WPA3 = 'WPA2/WPA3'
SECURITY_ENTERPRISE = 'Enterprise'
SECURITY_UNKNOWN = 'Unknown'
# Security type priority (higher = more secure)
SECURITY_PRIORITY = {
SECURITY_OPEN: 0,
SECURITY_WEP: 1,
SECURITY_WPA: 2,
SECURITY_WPA_WPA2: 3,
SECURITY_WPA2: 4,
SECURITY_WPA2_WPA3: 5,
SECURITY_WPA3: 6,
SECURITY_ENTERPRISE: 7,
SECURITY_UNKNOWN: -1,
}
# =============================================================================
# CIPHER TYPES
# =============================================================================
CIPHER_NONE = 'None'
CIPHER_WEP = 'WEP'
CIPHER_TKIP = 'TKIP'
CIPHER_CCMP = 'CCMP'
CIPHER_GCMP = 'GCMP'
CIPHER_UNKNOWN = 'Unknown'
# =============================================================================
# AUTHENTICATION TYPES
# =============================================================================
AUTH_OPEN = 'Open'
AUTH_PSK = 'PSK'
AUTH_SAE = 'SAE'
AUTH_EAP = 'EAP'
AUTH_OWE = 'OWE'
AUTH_UNKNOWN = 'Unknown'
# =============================================================================
# CHANNEL WIDTH
# =============================================================================
WIDTH_20_MHZ = '20MHz'
WIDTH_40_MHZ = '40MHz'
WIDTH_80_MHZ = '80MHz'
WIDTH_160_MHZ = '160MHz'
WIDTH_320_MHZ = '320MHz'
WIDTH_UNKNOWN = 'Unknown'
# =============================================================================
# SIGNAL STRENGTH BANDS (for proximity radar)
# =============================================================================
SIGNAL_STRONG = 'strong' # >= -50 dBm
SIGNAL_MEDIUM = 'medium' # -50 to -70 dBm
SIGNAL_WEAK = 'weak' # -70 to -85 dBm
SIGNAL_VERY_WEAK = 'very_weak' # < -85 dBm
SIGNAL_UNKNOWN = 'unknown'
# RSSI thresholds for signal bands
RSSI_STRONG = -50
RSSI_MEDIUM = -70
RSSI_WEAK = -85
def get_signal_band(rssi: int | None) -> str:
"""Get signal band from RSSI value."""
if rssi is None:
return SIGNAL_UNKNOWN
if rssi >= RSSI_STRONG:
return SIGNAL_STRONG
elif rssi >= RSSI_MEDIUM:
return SIGNAL_MEDIUM
elif rssi >= RSSI_WEAK:
return SIGNAL_WEAK
return SIGNAL_VERY_WEAK
# =============================================================================
# PROXIMITY BANDS (consistent with Bluetooth)
# =============================================================================
PROXIMITY_IMMEDIATE = 'immediate' # < 3m
PROXIMITY_NEAR = 'near' # 3-10m
PROXIMITY_FAR = 'far' # > 10m
PROXIMITY_UNKNOWN = 'unknown'
# RSSI thresholds for proximity band classification
PROXIMITY_RSSI_IMMEDIATE = -55 # >= -55 dBm -> immediate
PROXIMITY_RSSI_NEAR = -70 # >= -70 dBm -> near
def get_proximity_band(rssi: int | None) -> str:
"""Get proximity band from RSSI value."""
if rssi is None:
return PROXIMITY_UNKNOWN
if rssi >= PROXIMITY_RSSI_IMMEDIATE:
return PROXIMITY_IMMEDIATE
elif rssi >= PROXIMITY_RSSI_NEAR:
return PROXIMITY_NEAR
return PROXIMITY_FAR
# =============================================================================
# DISTANCE ESTIMATION (WiFi-specific)
# =============================================================================
# Path-loss exponent for indoor WiFi (typically 2.5-4.0)
WIFI_PATH_LOSS_EXPONENT = 3.0
# Reference RSSI at 1 meter (typical WiFi AP)
WIFI_RSSI_AT_1M = -40
# EMA smoothing alpha for RSSI
WIFI_EMA_ALPHA = 0.3
# =============================================================================
# SCAN MODES
# =============================================================================
SCAN_MODE_QUICK = 'quick' # Uses system tools (no monitor mode)
SCAN_MODE_DEEP = 'deep' # Uses airodump-ng (monitor mode required)
# =============================================================================
# TOOL DETECTION
# =============================================================================
# Quick scan tools (by platform priority)
QUICK_SCAN_TOOLS_LINUX = ['nmcli', 'iw', 'iwlist']
QUICK_SCAN_TOOLS_DARWIN = ['airport']
# Deep scan tools
DEEP_SCAN_TOOLS = ['airodump-ng']
# Monitor mode tools
MONITOR_MODE_TOOLS = ['airmon-ng', 'iw']
# Tool command timeouts (seconds)
TOOL_TIMEOUT_QUICK = 30.0
TOOL_TIMEOUT_DETECT = 5.0
# =============================================================================
# AIRODUMP-NG SETTINGS
# =============================================================================
AIRODUMP_OUTPUT_PREFIX = 'airodump_wifi'
AIRODUMP_POLL_INTERVAL = 1.0 # seconds between CSV reads
# =============================================================================
# HEURISTIC FLAGS
# =============================================================================
HEURISTIC_HIDDEN = 'hidden'
HEURISTIC_ROGUE_AP = 'rogue_ap'
HEURISTIC_EVIL_TWIN = 'evil_twin'
HEURISTIC_BEACON_FLOOD = 'beacon_flood'
HEURISTIC_WEAK_SECURITY = 'weak_security'
HEURISTIC_DEAUTH_DETECTED = 'deauth_detected'
HEURISTIC_NEW = 'new'
HEURISTIC_PERSISTENT = 'persistent'
HEURISTIC_STRONG_STABLE = 'strong_stable'
# Thresholds
BEACON_FLOOD_THRESHOLD = 50 # Same BSSID seen > 50 times/minute
PERSISTENT_MIN_SEEN = 10
PERSISTENT_WINDOW_SECONDS = 300
STRONG_RSSI_THRESHOLD = -50
STABLE_VARIANCE_THRESHOLD = 5.0
# =============================================================================
# COMMON VENDOR OUI PREFIXES (first 3 bytes of MAC)
# =============================================================================
VENDOR_OUIS = {
'00:00:5E': 'IANA',
'00:03:93': 'Apple',
'00:0A:95': 'Apple',
'00:0D:93': 'Apple',
'00:11:24': 'Apple',
'00:14:51': 'Apple',
'00:16:CB': 'Apple',
'00:17:F2': 'Apple',
'00:19:E3': 'Apple',
'00:1B:63': 'Apple',
'00:1C:B3': 'Apple',
'00:1D:4F': 'Apple',
'00:1E:52': 'Apple',
'00:1E:C2': 'Apple',
'00:1F:5B': 'Apple',
'00:1F:F3': 'Apple',
'00:21:E9': 'Apple',
'00:22:41': 'Apple',
'00:23:12': 'Apple',
'00:23:32': 'Apple',
'00:23:6C': 'Apple',
'00:23:DF': 'Apple',
'00:24:36': 'Apple',
'00:25:00': 'Apple',
'00:25:4B': 'Apple',
'00:25:BC': 'Apple',
'00:26:08': 'Apple',
'00:26:4A': 'Apple',
'00:26:B0': 'Apple',
'00:26:BB': 'Apple',
'00:50:F2': 'Microsoft',
'00:15:5D': 'Microsoft',
'00:17:FA': 'Microsoft',
'00:1D:D8': 'Microsoft',
'00:50:56': 'VMware',
'00:0C:29': 'VMware',
'00:05:69': 'VMware',
'08:00:27': 'VirtualBox',
'00:1C:42': 'Parallels',
'00:16:3E': 'Xen',
'DC:A6:32': 'Raspberry Pi',
'B8:27:EB': 'Raspberry Pi',
'E4:5F:01': 'Raspberry Pi',
'28:CD:C1': 'Raspberry Pi',
'00:1A:11': 'Google',
'00:1A:22': 'Google',
'3C:5A:B4': 'Google',
'54:60:09': 'Google',
'94:EB:2C': 'Google',
'F4:F5:D8': 'Google',
'00:17:C4': 'Netgear',
'00:1B:2F': 'Netgear',
'00:1E:2A': 'Netgear',
'00:22:3F': 'Netgear',
'00:24:B2': 'Netgear',
'00:26:F2': 'Netgear',
'00:18:F8': 'Cisco',
'00:1A:A1': 'Cisco',
'00:1B:0C': 'Cisco',
'00:1B:D4': 'Cisco',
'00:1C:0E': 'Cisco',
'00:1C:57': 'Cisco',
'00:40:96': 'Cisco',
'00:50:54': 'Cisco',
'00:60:5C': 'Cisco',
'E8:65:D4': 'Ubiquiti',
'FC:EC:DA': 'Ubiquiti',
'00:27:22': 'Ubiquiti',
'04:18:D6': 'Ubiquiti',
'18:E8:29': 'Ubiquiti',
'24:A4:3C': 'Ubiquiti',
'44:D9:E7': 'Ubiquiti',
'68:72:51': 'Ubiquiti',
'74:83:C2': 'Ubiquiti',
'78:8A:20': 'Ubiquiti',
'B4:FB:E4': 'Ubiquiti',
'F0:9F:C2': 'Ubiquiti',
'00:0C:F1': 'Intel',
'00:13:02': 'Intel',
'00:13:20': 'Intel',
'00:13:CE': 'Intel',
'00:13:E8': 'Intel',
'00:15:00': 'Intel',
'00:15:17': 'Intel',
'00:16:6F': 'Intel',
'00:16:76': 'Intel',
'00:16:EA': 'Intel',
'00:16:EB': 'Intel',
'00:18:DE': 'Intel',
'00:19:D1': 'Intel',
'00:19:D2': 'Intel',
'00:1B:21': 'Intel',
'00:1B:77': 'Intel',
'00:1C:BF': 'Intel',
'00:1D:E0': 'Intel',
'00:1D:E1': 'Intel',
'00:1E:64': 'Intel',
'00:1E:65': 'Intel',
'00:1E:67': 'Intel',
'00:1F:3B': 'Intel',
'00:1F:3C': 'Intel',
'00:20:E0': 'TP-Link',
'00:23:CD': 'TP-Link',
'00:25:86': 'TP-Link',
'00:27:19': 'TP-Link',
'14:CC:20': 'TP-Link',
'14:CF:92': 'TP-Link',
'18:A6:F7': 'TP-Link',
'1C:3B:F3': 'TP-Link',
'30:B5:C2': 'TP-Link',
'50:C7:BF': 'TP-Link',
'54:C8:0F': 'TP-Link',
'60:E3:27': 'TP-Link',
'64:56:01': 'TP-Link',
'64:66:B3': 'TP-Link',
'64:70:02': 'TP-Link',
}
def get_vendor_from_mac(mac: str) -> str | None:
"""Get vendor name from MAC address OUI."""
if not mac:
return None
# Normalize MAC format
mac_upper = mac.upper().replace('-', ':')
oui = mac_upper[:8]
return VENDOR_OUIS.get(oui)
# =============================================================================
# HIDDEN SSID CORRELATION
# =============================================================================
# Time window for correlating probe requests with hidden AP associations
HIDDEN_CORRELATION_WINDOW_SECONDS = 60
# Minimum confidence for hidden SSID revelation
HIDDEN_MIN_CORRELATION_CONFIDENCE = 0.7
# =============================================================================
# CHANNEL ANALYSIS
# =============================================================================
# Weights for channel utilization scoring
CHANNEL_WEIGHT_AP_COUNT = 0.6
CHANNEL_WEIGHT_CLIENT_COUNT = 0.4
# RSSI adjustment factor (stronger signals = more interference)
CHANNEL_RSSI_INTERFERENCE_FACTOR = 0.1

327
utils/wifi/hidden_ssid.py Normal file
View File

@@ -0,0 +1,327 @@
"""
Hidden SSID correlation engine.
Correlates probe requests from clients with hidden access points to reveal
the actual SSID of hidden networks.
Strategy:
1. Track probe requests with source MACs and probed SSIDs
2. Track hidden networks (empty ESSID) with their BSSIDs
3. When a client probes for an SSID and then associates with a hidden AP
within a time window, correlate the SSID to the hidden AP
4. Also correlate when the same client is seen both probing for an SSID
and sending data to a hidden AP
"""
from __future__ import annotations
import logging
import threading
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
from .constants import (
HIDDEN_CORRELATION_WINDOW_SECONDS,
HIDDEN_MIN_CORRELATION_CONFIDENCE,
)
logger = logging.getLogger(__name__)
# Global correlator instance
_correlator_instance: Optional['HiddenSSIDCorrelator'] = None
_correlator_lock = threading.Lock()
@dataclass
class ProbeRecord:
"""Record of a probe request."""
timestamp: datetime
client_mac: str
probed_ssid: str
@dataclass
class AssociationRecord:
"""Record of a client association."""
timestamp: datetime
client_mac: str
bssid: str
@dataclass
class CorrelationResult:
"""Result of an SSID correlation."""
bssid: str
revealed_ssid: str
client_mac: str
confidence: float
correlation_time: datetime
method: str # 'probe_association', 'data_correlation'
class HiddenSSIDCorrelator:
"""
Correlates probe requests with hidden APs to reveal their SSIDs.
Uses time-based correlation: when a client probes for an SSID and
then is seen communicating with a hidden AP, the SSID is likely
that of the hidden network.
"""
def __init__(
self,
correlation_window: float = HIDDEN_CORRELATION_WINDOW_SECONDS,
min_confidence: float = HIDDEN_MIN_CORRELATION_CONFIDENCE,
):
"""
Initialize the correlator.
Args:
correlation_window: Time window for correlation (seconds).
min_confidence: Minimum confidence to report a correlation.
"""
self.correlation_window = correlation_window
self.min_confidence = min_confidence
self._lock = threading.Lock()
# Storage
self._probe_records: list[ProbeRecord] = []
self._association_records: list[AssociationRecord] = []
self._hidden_aps: dict[str, datetime] = {} # BSSID -> last_seen
self._revealed: dict[str, CorrelationResult] = {} # BSSID -> result
# Callbacks
self._on_ssid_revealed: Optional[Callable[[CorrelationResult], None]] = None
def record_probe(self, client_mac: str, probed_ssid: str, timestamp: Optional[datetime] = None):
"""
Record a probe request.
Args:
client_mac: MAC address of the probing client.
probed_ssid: SSID being probed for.
timestamp: Time of the probe (defaults to now).
"""
if not client_mac or not probed_ssid:
return
timestamp = timestamp or datetime.now()
client_mac = client_mac.upper()
with self._lock:
self._probe_records.append(ProbeRecord(
timestamp=timestamp,
client_mac=client_mac,
probed_ssid=probed_ssid,
))
# Prune old records
self._prune_records()
# Check for correlations with known hidden APs
self._check_correlations()
def record_association(self, client_mac: str, bssid: str, timestamp: Optional[datetime] = None):
"""
Record a client association with an AP.
Args:
client_mac: MAC address of the client.
bssid: BSSID of the AP.
timestamp: Time of the association (defaults to now).
"""
if not client_mac or not bssid:
return
timestamp = timestamp or datetime.now()
client_mac = client_mac.upper()
bssid = bssid.upper()
with self._lock:
self._association_records.append(AssociationRecord(
timestamp=timestamp,
client_mac=client_mac,
bssid=bssid,
))
# Prune old records
self._prune_records()
# Check for correlations
self._check_correlations()
def record_hidden_ap(self, bssid: str, timestamp: Optional[datetime] = None):
"""
Record a hidden access point (empty SSID).
Args:
bssid: BSSID of the hidden AP.
timestamp: Time when seen (defaults to now).
"""
if not bssid:
return
timestamp = timestamp or datetime.now()
bssid = bssid.upper()
with self._lock:
self._hidden_aps[bssid] = timestamp
# Check for correlations
self._check_correlations()
def get_revealed_ssid(self, bssid: str) -> Optional[str]:
"""
Get the revealed SSID for a hidden AP, if known.
Args:
bssid: BSSID to look up.
Returns:
Revealed SSID or None.
"""
with self._lock:
result = self._revealed.get(bssid.upper())
return result.revealed_ssid if result else None
def get_correlation(self, bssid: str) -> Optional[CorrelationResult]:
"""
Get the full correlation result for a hidden AP.
Args:
bssid: BSSID to look up.
Returns:
CorrelationResult or None.
"""
with self._lock:
return self._revealed.get(bssid.upper())
def get_all_revealed(self) -> dict[str, str]:
"""
Get all revealed SSID mappings.
Returns:
Dict of BSSID -> revealed SSID.
"""
with self._lock:
return {
bssid: result.revealed_ssid
for bssid, result in self._revealed.items()
}
def set_callback(self, callback: Callable[[CorrelationResult], None]):
"""Set callback for when an SSID is revealed."""
self._on_ssid_revealed = callback
def _prune_records(self):
"""Remove records older than the correlation window."""
cutoff = datetime.now() - timedelta(seconds=self.correlation_window * 2)
self._probe_records = [
r for r in self._probe_records
if r.timestamp > cutoff
]
self._association_records = [
r for r in self._association_records
if r.timestamp > cutoff
]
def _check_correlations(self):
"""Check for new SSID correlations."""
now = datetime.now()
window = timedelta(seconds=self.correlation_window)
for bssid in list(self._hidden_aps.keys()):
# Skip if already revealed
if bssid in self._revealed:
continue
# Find associations with this hidden AP
relevant_associations = [
a for a in self._association_records
if a.bssid == bssid and (now - a.timestamp) <= window
]
if not relevant_associations:
continue
# For each associated client, look for recent probes
for assoc in relevant_associations:
client_probes = [
p for p in self._probe_records
if p.client_mac == assoc.client_mac
and abs((p.timestamp - assoc.timestamp).total_seconds()) <= self.correlation_window
]
if not client_probes:
continue
# Use the most recent probe from this client
latest_probe = max(client_probes, key=lambda p: p.timestamp)
# Calculate confidence based on timing
time_diff = abs((latest_probe.timestamp - assoc.timestamp).total_seconds())
confidence = 1.0 - (time_diff / self.correlation_window)
confidence = max(0.0, min(1.0, confidence))
if confidence >= self.min_confidence:
result = CorrelationResult(
bssid=bssid,
revealed_ssid=latest_probe.probed_ssid,
client_mac=assoc.client_mac,
confidence=confidence,
correlation_time=now,
method='probe_association',
)
self._revealed[bssid] = result
logger.info(
f"Hidden SSID revealed: {bssid} -> '{latest_probe.probed_ssid}' "
f"(confidence: {confidence:.2f})"
)
# Callback
if self._on_ssid_revealed:
try:
self._on_ssid_revealed(result)
except Exception as e:
logger.debug(f"SSID reveal callback error: {e}")
break # Found correlation, move to next AP
def clear(self):
"""Clear all stored data."""
with self._lock:
self._probe_records.clear()
self._association_records.clear()
self._hidden_aps.clear()
self._revealed.clear()
def get_hidden_correlator(
correlation_window: float = HIDDEN_CORRELATION_WINDOW_SECONDS,
min_confidence: float = HIDDEN_MIN_CORRELATION_CONFIDENCE,
) -> HiddenSSIDCorrelator:
"""
Get or create the global hidden SSID correlator instance.
Args:
correlation_window: Time window for correlation.
min_confidence: Minimum confidence threshold.
Returns:
HiddenSSIDCorrelator instance.
"""
global _correlator_instance
with _correlator_lock:
if _correlator_instance is None:
_correlator_instance = HiddenSSIDCorrelator(
correlation_window=correlation_window,
min_confidence=min_confidence,
)
return _correlator_instance

653
utils/wifi/models.py Normal file
View File

@@ -0,0 +1,653 @@
"""
WiFi data models for the unified scanner.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from .constants import (
BAND_UNKNOWN,
SECURITY_UNKNOWN,
CIPHER_UNKNOWN,
AUTH_UNKNOWN,
WIDTH_UNKNOWN,
SIGNAL_UNKNOWN,
PROXIMITY_UNKNOWN,
SCAN_MODE_QUICK,
get_band_from_channel,
get_signal_band,
get_proximity_band,
get_vendor_from_mac,
)
@dataclass
class WiFiObservation:
"""Represents a single WiFi access point scan result."""
timestamp: datetime
bssid: str
essid: Optional[str] = None
channel: Optional[int] = None
frequency_mhz: Optional[int] = None
rssi: Optional[int] = None
# Security
security: str = SECURITY_UNKNOWN
cipher: str = CIPHER_UNKNOWN
auth: str = AUTH_UNKNOWN
# Additional info
width: str = WIDTH_UNKNOWN
beacon_count: int = 0
data_count: int = 0
@property
def is_hidden(self) -> bool:
"""Check if this is a hidden network."""
return not self.essid or self.essid.strip() == ''
@property
def band(self) -> str:
"""Get WiFi band from channel."""
if self.channel:
return get_band_from_channel(self.channel)
return BAND_UNKNOWN
@property
def vendor(self) -> Optional[str]:
"""Get vendor name from BSSID."""
return get_vendor_from_mac(self.bssid)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'timestamp': self.timestamp.isoformat(),
'bssid': self.bssid,
'essid': self.essid,
'is_hidden': self.is_hidden,
'channel': self.channel,
'frequency_mhz': self.frequency_mhz,
'band': self.band,
'rssi': self.rssi,
'security': self.security,
'cipher': self.cipher,
'auth': self.auth,
'width': self.width,
'beacon_count': self.beacon_count,
'data_count': self.data_count,
'vendor': self.vendor,
}
@dataclass
class WiFiAccessPoint:
"""Aggregated WiFi access point data over time."""
# Identity
bssid: str
essid: Optional[str] = None
is_hidden: bool = False
revealed_essid: Optional[str] = None # Revealed through correlation
# Radio info
channel: Optional[int] = None
frequency_mhz: Optional[int] = None
band: str = BAND_UNKNOWN
width: str = WIDTH_UNKNOWN
# Signal aggregation
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
rssi_current: Optional[int] = None
rssi_median: Optional[float] = None
rssi_min: Optional[int] = None
rssi_max: Optional[int] = None
rssi_variance: Optional[float] = None
rssi_ema: Optional[float] = None
# Proximity/signal bands
signal_band: str = SIGNAL_UNKNOWN
proximity_band: str = PROXIMITY_UNKNOWN
estimated_distance_m: Optional[float] = None
distance_confidence: float = 0.0
# Security
security: str = SECURITY_UNKNOWN
cipher: str = CIPHER_UNKNOWN
auth: str = AUTH_UNKNOWN
# Timestamps
first_seen: datetime = field(default_factory=datetime.now)
last_seen: datetime = field(default_factory=datetime.now)
seen_count: int = 0
seen_rate: float = 0.0 # Observations per minute
# Traffic stats
beacon_count: int = 0
data_count: int = 0
client_count: int = 0
# Metadata
vendor: Optional[str] = None
# Heuristic flags
heuristic_flags: list[str] = field(default_factory=list)
is_new: bool = False
is_persistent: bool = False
is_strong_stable: bool = False
# Baseline tracking
in_baseline: bool = False
baseline_id: Optional[int] = None
@property
def display_name(self) -> str:
"""Get display name (revealed SSID, ESSID, or BSSID)."""
if self.revealed_essid:
return f"{self.revealed_essid} (revealed)"
if self.essid and not self.is_hidden:
return self.essid
return f"[Hidden] {self.bssid}"
@property
def age_seconds(self) -> float:
"""Seconds since last seen."""
return (datetime.now() - self.last_seen).total_seconds()
@property
def duration_seconds(self) -> float:
"""Total duration from first to last seen."""
return (self.last_seen - self.first_seen).total_seconds()
def get_rssi_history(self, max_points: int = 50) -> list[dict]:
"""Get RSSI history for visualization."""
if not self.rssi_samples:
return []
samples = self.rssi_samples[-max_points:]
return [
{'timestamp': ts.isoformat(), 'rssi': rssi}
for ts, rssi in samples
]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# Identity
'bssid': self.bssid,
'essid': self.essid,
'display_name': self.display_name,
'is_hidden': self.is_hidden,
'revealed_essid': self.revealed_essid,
# Radio
'channel': self.channel,
'frequency_mhz': self.frequency_mhz,
'band': self.band,
'width': self.width,
# Signal
'rssi_current': self.rssi_current,
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'rssi_variance': round(self.rssi_variance, 2) if self.rssi_variance else None,
'rssi_ema': round(self.rssi_ema, 1) if self.rssi_ema else None,
'rssi_history': self.get_rssi_history(),
# Proximity
'signal_band': self.signal_band,
'proximity_band': self.proximity_band,
'estimated_distance_m': round(self.estimated_distance_m, 2) if self.estimated_distance_m else None,
'distance_confidence': round(self.distance_confidence, 2),
# Security
'security': self.security,
'cipher': self.cipher,
'auth': self.auth,
# Timestamps
'first_seen': self.first_seen.isoformat(),
'last_seen': self.last_seen.isoformat(),
'age_seconds': round(self.age_seconds, 1),
'duration_seconds': round(self.duration_seconds, 1),
'seen_count': self.seen_count,
'seen_rate': round(self.seen_rate, 2),
# Traffic
'beacon_count': self.beacon_count,
'data_count': self.data_count,
'client_count': self.client_count,
# Metadata
'vendor': self.vendor,
# Heuristics
'heuristic_flags': self.heuristic_flags,
'heuristics': {
'is_new': self.is_new,
'is_persistent': self.is_persistent,
'is_strong_stable': self.is_strong_stable,
},
# Baseline
'in_baseline': self.in_baseline,
'baseline_id': self.baseline_id,
}
def to_summary_dict(self) -> dict:
"""Compact dictionary for list views."""
return {
'bssid': self.bssid,
'essid': self.essid,
'display_name': self.display_name,
'is_hidden': self.is_hidden,
'channel': self.channel,
'band': self.band,
'rssi_current': self.rssi_current,
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
'signal_band': self.signal_band,
'proximity_band': self.proximity_band,
'security': self.security,
'vendor': self.vendor,
'client_count': self.client_count,
'last_seen': self.last_seen.isoformat(),
'age_seconds': round(self.age_seconds, 1),
'heuristic_flags': self.heuristic_flags,
'in_baseline': self.in_baseline,
}
def to_legacy_dict(self) -> dict:
"""Convert to legacy format for TSCM compatibility."""
return {
'bssid': self.bssid,
'essid': self.essid or '',
'power': str(self.rssi_current) if self.rssi_current else '-100',
'channel': str(self.channel) if self.channel else '',
'privacy': self.security,
'first_seen': self.first_seen.isoformat() if self.first_seen else '',
'last_seen': self.last_seen.isoformat() if self.last_seen else '',
'beacon_count': str(self.beacon_count),
'lan_ip': '', # Not tracked in new system
}
@dataclass
class WiFiClient:
"""WiFi client (station) observed during scanning."""
# Identity
mac: str
vendor: Optional[str] = None
# Signal
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
rssi_current: Optional[int] = None
rssi_median: Optional[float] = None
rssi_min: Optional[int] = None
rssi_max: Optional[int] = None
rssi_ema: Optional[float] = None
# Proximity
signal_band: str = SIGNAL_UNKNOWN
proximity_band: str = PROXIMITY_UNKNOWN
estimated_distance_m: Optional[float] = None
# Association
associated_bssid: Optional[str] = None
is_associated: bool = False
# Probes
probed_ssids: list[str] = field(default_factory=list)
probe_timestamps: dict[str, datetime] = field(default_factory=dict)
# Timestamps
first_seen: datetime = field(default_factory=datetime.now)
last_seen: datetime = field(default_factory=datetime.now)
seen_count: int = 0
# Traffic stats
packets_sent: int = 0
packets_received: int = 0
# Heuristics
heuristic_flags: list[str] = field(default_factory=list)
@property
def age_seconds(self) -> float:
"""Seconds since last seen."""
return (datetime.now() - self.last_seen).total_seconds()
def get_rssi_history(self, max_points: int = 50) -> list[dict]:
"""Get RSSI history for visualization."""
if not self.rssi_samples:
return []
samples = self.rssi_samples[-max_points:]
return [
{'timestamp': ts.isoformat(), 'rssi': rssi}
for ts, rssi in samples
]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'mac': self.mac,
'vendor': self.vendor,
# Signal
'rssi_current': self.rssi_current,
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'rssi_ema': round(self.rssi_ema, 1) if self.rssi_ema else None,
'rssi_history': self.get_rssi_history(),
# Proximity
'signal_band': self.signal_band,
'proximity_band': self.proximity_band,
'estimated_distance_m': round(self.estimated_distance_m, 2) if self.estimated_distance_m else None,
# Association
'associated_bssid': self.associated_bssid,
'is_associated': self.is_associated,
# Probes
'probed_ssids': self.probed_ssids,
'probe_count': len(self.probed_ssids),
# Timestamps
'first_seen': self.first_seen.isoformat(),
'last_seen': self.last_seen.isoformat(),
'age_seconds': round(self.age_seconds, 1),
'seen_count': self.seen_count,
# Traffic
'packets_sent': self.packets_sent,
'packets_received': self.packets_received,
# Heuristics
'heuristic_flags': self.heuristic_flags,
}
@dataclass
class WiFiProbeRequest:
"""A single probe request captured during scanning."""
timestamp: datetime
client_mac: str
probed_ssid: str
rssi: Optional[int] = None
client_vendor: Optional[str] = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'timestamp': self.timestamp.isoformat(),
'client_mac': self.client_mac,
'probed_ssid': self.probed_ssid,
'rssi': self.rssi,
'client_vendor': self.client_vendor,
}
@dataclass
class ChannelStats:
"""Statistics for a single WiFi channel."""
channel: int
band: str = BAND_UNKNOWN
frequency_mhz: Optional[int] = None
# Counts
ap_count: int = 0
client_count: int = 0
# Signal stats
rssi_avg: Optional[float] = None
rssi_min: Optional[int] = None
rssi_max: Optional[int] = None
# Utilization score (0.0-1.0, lower is better)
utilization_score: float = 0.0
# Recommendation rank (1 = best)
recommendation_rank: Optional[int] = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'channel': self.channel,
'band': self.band,
'frequency_mhz': self.frequency_mhz,
'ap_count': self.ap_count,
'client_count': self.client_count,
'rssi_avg': round(self.rssi_avg, 1) if self.rssi_avg else None,
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'utilization_score': round(self.utilization_score, 3),
'recommendation_rank': self.recommendation_rank,
}
@dataclass
class ChannelRecommendation:
"""Channel recommendation with reasoning."""
channel: int
band: str
score: float # Lower is better
reason: str
is_dfs: bool = False
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'channel': self.channel,
'band': self.band,
'score': round(self.score, 3),
'reason': self.reason,
'is_dfs': self.is_dfs,
}
@dataclass
class WiFiScanResult:
"""Complete result from a WiFi scan operation."""
# Discovered entities
access_points: list[WiFiAccessPoint] = field(default_factory=list)
clients: list[WiFiClient] = field(default_factory=list)
probe_requests: list[WiFiProbeRequest] = field(default_factory=list)
# Channel analysis
channel_stats: list[ChannelStats] = field(default_factory=list)
recommendations: list[ChannelRecommendation] = field(default_factory=list)
# Scan metadata
scan_mode: str = SCAN_MODE_QUICK
interface: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
duration_seconds: Optional[float] = None
# Status
is_complete: bool = False
error: Optional[str] = None
warnings: list[str] = field(default_factory=list)
@property
def network_count(self) -> int:
"""Total number of access points found."""
return len(self.access_points)
@property
def client_count(self) -> int:
"""Total number of clients found."""
return len(self.clients)
@property
def hidden_count(self) -> int:
"""Number of hidden networks."""
return sum(1 for ap in self.access_points if ap.is_hidden)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# Entities
'access_points': [ap.to_dict() for ap in self.access_points],
'clients': [c.to_dict() for c in self.clients],
'probe_requests': [p.to_dict() for p in self.probe_requests],
# Channel analysis
'channel_stats': [cs.to_dict() for cs in self.channel_stats],
'recommendations': [r.to_dict() for r in self.recommendations],
# Metadata
'scan_mode': self.scan_mode,
'interface': self.interface,
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'duration_seconds': round(self.duration_seconds, 2) if self.duration_seconds else None,
# Stats
'network_count': self.network_count,
'client_count': self.client_count,
'hidden_count': self.hidden_count,
# Status
'is_complete': self.is_complete,
'error': self.error,
'warnings': self.warnings,
}
def to_summary_dict(self) -> dict:
"""Compact summary for status endpoints."""
return {
'scan_mode': self.scan_mode,
'interface': self.interface,
'network_count': self.network_count,
'client_count': self.client_count,
'hidden_count': self.hidden_count,
'is_complete': self.is_complete,
'error': self.error,
}
@dataclass
class WiFiScanStatus:
"""Current WiFi scanning status."""
is_scanning: bool = False
scan_mode: str = SCAN_MODE_QUICK
interface: Optional[str] = None
started_at: Optional[datetime] = None
networks_found: int = 0
clients_found: int = 0
error: Optional[str] = None
@property
def elapsed_seconds(self) -> Optional[float]:
"""Seconds since scan started."""
if self.started_at:
return (datetime.now() - self.started_at).total_seconds()
return None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'is_scanning': self.is_scanning,
'scan_mode': self.scan_mode,
'interface': self.interface,
'started_at': self.started_at.isoformat() if self.started_at else None,
'elapsed_seconds': round(self.elapsed_seconds, 1) if self.elapsed_seconds else None,
'networks_found': self.networks_found,
'clients_found': self.clients_found,
'error': self.error,
}
@dataclass
class WiFiCapabilities:
"""WiFi system capabilities check result."""
# Platform
platform: str = 'unknown' # 'linux', 'darwin', 'windows'
is_root: bool = False
# Interfaces
interfaces: list[dict] = field(default_factory=list)
default_interface: Optional[str] = None
# Quick scan tools
has_nmcli: bool = False
has_iw: bool = False
has_iwlist: bool = False
has_airport: bool = False
preferred_quick_tool: Optional[str] = None
# Deep scan tools
has_airmon_ng: bool = False
has_airodump_ng: bool = False
has_monitor_capable_interface: bool = False
monitor_interface: Optional[str] = None
# Issues
issues: list[str] = field(default_factory=list)
@property
def can_quick_scan(self) -> bool:
"""Whether quick scanning is available."""
return (
self.has_nmcli or
self.has_iw or
self.has_iwlist or
self.has_airport
) and len(self.interfaces) > 0
@property
def can_deep_scan(self) -> bool:
"""Whether deep scanning is available."""
return (
self.has_airmon_ng and
self.has_airodump_ng and
self.has_monitor_capable_interface and
self.is_root
)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# Status
'available': self.can_quick_scan,
'can_quick_scan': self.can_quick_scan,
'can_deep_scan': self.can_deep_scan,
# Platform
'platform': self.platform,
'is_root': self.is_root,
# Interfaces
'interfaces': self.interfaces,
'default_interface': self.default_interface,
# Quick scan tools
'tools': {
'nmcli': self.has_nmcli,
'iw': self.has_iw,
'iwlist': self.has_iwlist,
'airport': self.has_airport,
'airmon_ng': self.has_airmon_ng,
'airodump_ng': self.has_airodump_ng,
},
'preferred_quick_tool': self.preferred_quick_tool,
# Deep scan
'has_monitor_capable_interface': self.has_monitor_capable_interface,
'monitor_interface': self.monitor_interface,
# Issues
'issues': self.issues,
}

View File

@@ -0,0 +1,19 @@
"""
WiFi scan output parsers.
Each parser converts tool-specific output into WiFiObservation objects.
"""
from .airport import parse_airport_scan
from .nmcli import parse_nmcli_scan
from .iw import parse_iw_scan
from .iwlist import parse_iwlist_scan
from .airodump import parse_airodump_csv
__all__ = [
'parse_airport_scan',
'parse_nmcli_scan',
'parse_iw_scan',
'parse_iwlist_scan',
'parse_airodump_csv',
]

View File

@@ -0,0 +1,392 @@
"""
Parser for airodump-ng CSV output.
airodump-ng outputs two sections in its CSV:
1. Access Points section
2. Clients section (stations)
Example format:
BSSID, First time seen, Last time seen, channel, Speed, Privacy, Cipher, Authentication, Power, # beacons, # IV, LAN IP, ID-length, ESSID, Key
00:11:22:33:44:55, 2024-01-01 10:00:00, 2024-01-01 10:05:00, 6, 54, WPA2, CCMP, PSK, -65, 100, 10, 0.0.0.0, 6, MyWiFi,
Station MAC, First time seen, Last time seen, Power, # packets, BSSID, Probed ESSIDs
AA:BB:CC:DD:EE:FF, 2024-01-01 10:00:00, 2024-01-01 10:05:00, -70, 50, 00:11:22:33:44:55, NetworkA, NetworkB
"""
from __future__ import annotations
import csv
import io
import logging
import re
from datetime import datetime
from typing import Optional
from ..models import WiFiObservation
from ..constants import (
SECURITY_OPEN,
SECURITY_WEP,
SECURITY_WPA,
SECURITY_WPA2,
SECURITY_WPA3,
SECURITY_WPA_WPA2,
SECURITY_UNKNOWN,
CIPHER_CCMP,
CIPHER_TKIP,
CIPHER_WEP,
CIPHER_UNKNOWN,
AUTH_PSK,
AUTH_SAE,
AUTH_EAP,
AUTH_OWE,
AUTH_OPEN,
AUTH_UNKNOWN,
CHANNEL_FREQUENCIES,
)
logger = logging.getLogger(__name__)
def parse_airodump_csv(filepath: str) -> tuple[list[WiFiObservation], list[dict]]:
"""
Parse airodump-ng CSV file.
Args:
filepath: Path to the airodump CSV file.
Returns:
Tuple of (network observations, client data dicts).
"""
networks = []
clients = []
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
# airodump-ng separates sections with blank lines
# Split into AP section and Station section
sections = content.split('\n\n')
for section in sections:
section = section.strip()
if not section:
continue
lines = section.split('\n')
if not lines:
continue
header = lines[0].strip()
if header.startswith('BSSID'):
# Access Points section
networks = _parse_ap_section(lines)
elif header.startswith('Station MAC'):
# Clients/Stations section
clients = _parse_client_section(lines)
except FileNotFoundError:
logger.debug(f"airodump CSV not found: {filepath}")
except Exception as e:
logger.debug(f"Error parsing airodump CSV: {e}")
return networks, clients
def _parse_ap_section(lines: list[str]) -> list[WiFiObservation]:
"""Parse the access points section of airodump CSV."""
networks = []
if len(lines) < 2:
return networks
# Parse header to get column indices
header = lines[0]
header_parts = [h.strip().lower() for h in header.split(',')]
# Find column indices
col_map = {}
for i, col in enumerate(header_parts):
if 'bssid' in col:
col_map['bssid'] = i
elif 'channel' in col and 'id-length' not in col:
col_map['channel'] = i
elif 'privacy' in col:
col_map['privacy'] = i
elif 'cipher' in col:
col_map['cipher'] = i
elif 'authentication' in col:
col_map['auth'] = i
elif 'power' in col:
col_map['power'] = i
elif 'beacons' in col or '# beacons' in col:
col_map['beacons'] = i
elif '# iv' in col or 'iv' in col:
col_map['data'] = i
elif 'essid' in col:
col_map['essid'] = i
elif 'first time seen' in col:
col_map['first_seen'] = i
elif 'last time seen' in col:
col_map['last_seen'] = i
# Parse data rows
for line in lines[1:]:
line = line.strip()
if not line:
continue
# Handle CSV properly (ESSID might contain commas)
try:
# Use CSV reader for proper parsing
reader = csv.reader(io.StringIO(line))
parts = next(reader)
except Exception:
parts = line.split(',')
parts = [p.strip() for p in parts]
if len(parts) < 5:
continue
try:
# Get BSSID
bssid_idx = col_map.get('bssid', 0)
bssid = parts[bssid_idx].upper() if bssid_idx < len(parts) else None
if not bssid or not re.match(r'^[0-9A-F:]{17}$', bssid):
continue
# Get channel
channel = None
chan_idx = col_map.get('channel', 3)
if chan_idx < len(parts):
chan_str = parts[chan_idx].strip()
if chan_str.lstrip('-').isdigit():
channel = int(chan_str)
if channel < 0:
channel = abs(channel) # Negative indicates not currently on channel
# Get power/RSSI
rssi = None
power_idx = col_map.get('power', 8)
if power_idx < len(parts):
power_str = parts[power_idx].strip()
if power_str.lstrip('-').isdigit():
rssi = int(power_str)
if rssi > 0:
rssi = -rssi # Should be negative
# Get security
privacy_idx = col_map.get('privacy', 5)
privacy = parts[privacy_idx].strip() if privacy_idx < len(parts) else ''
security = _parse_airodump_security(privacy)
# Get cipher
cipher_idx = col_map.get('cipher', 6)
cipher_str = parts[cipher_idx].strip() if cipher_idx < len(parts) else ''
cipher = _parse_airodump_cipher(cipher_str)
# Get auth
auth_idx = col_map.get('auth', 7)
auth_str = parts[auth_idx].strip() if auth_idx < len(parts) else ''
auth = _parse_airodump_auth(auth_str)
# Get ESSID (usually last column, might contain commas)
essid = None
essid_idx = col_map.get('essid', len(parts) - 1)
if essid_idx < len(parts):
essid = parts[essid_idx].strip()
# Handle special markers
if essid in ('', '<length: 0>', '<length: 0>'):
essid = None
# Get beacon count
beacon_count = 0
beacon_idx = col_map.get('beacons', 9)
if beacon_idx < len(parts):
beacon_str = parts[beacon_idx].strip()
if beacon_str.isdigit():
beacon_count = int(beacon_str)
# Get data count (IVs)
data_count = 0
data_idx = col_map.get('data', 10)
if data_idx < len(parts):
data_str = parts[data_idx].strip()
if data_str.isdigit():
data_count = int(data_str)
# Get frequency from channel
frequency_mhz = CHANNEL_FREQUENCIES.get(channel) if channel else None
obs = WiFiObservation(
timestamp=datetime.now(),
bssid=bssid,
essid=essid,
channel=channel,
frequency_mhz=frequency_mhz,
rssi=rssi,
security=security,
cipher=cipher,
auth=auth,
beacon_count=beacon_count,
data_count=data_count,
)
networks.append(obs)
except Exception as e:
logger.debug(f"Error parsing AP line: {line!r} - {e}")
return networks
def _parse_client_section(lines: list[str]) -> list[dict]:
"""Parse the clients/stations section of airodump CSV."""
clients = []
if len(lines) < 2:
return clients
# Parse header
header = lines[0]
header_parts = [h.strip().lower() for h in header.split(',')]
# Find column indices
col_map = {}
for i, col in enumerate(header_parts):
if 'station mac' in col:
col_map['mac'] = i
elif 'power' in col:
col_map['power'] = i
elif 'packets' in col or '# packets' in col:
col_map['packets'] = i
elif 'bssid' in col:
col_map['bssid'] = i
elif 'probed' in col:
col_map['probed'] = i
elif 'first time seen' in col:
col_map['first_seen'] = i
elif 'last time seen' in col:
col_map['last_seen'] = i
# Parse data rows
for line in lines[1:]:
line = line.strip()
if not line:
continue
parts = line.split(',')
parts = [p.strip() for p in parts]
if len(parts) < 3:
continue
try:
# Get MAC
mac_idx = col_map.get('mac', 0)
mac = parts[mac_idx].upper() if mac_idx < len(parts) else None
if not mac or not re.match(r'^[0-9A-F:]{17}$', mac):
continue
# Get power/RSSI
rssi = None
power_idx = col_map.get('power', 3)
if power_idx < len(parts):
power_str = parts[power_idx].strip()
if power_str.lstrip('-').isdigit():
rssi = int(power_str)
if rssi > 0:
rssi = -rssi
# Get packets
packets = 0
packets_idx = col_map.get('packets', 4)
if packets_idx < len(parts):
packets_str = parts[packets_idx].strip()
if packets_str.isdigit():
packets = int(packets_str)
# Get associated BSSID
bssid = None
bssid_idx = col_map.get('bssid', 5)
if bssid_idx < len(parts):
bssid = parts[bssid_idx].strip().upper()
if bssid == '(NOT ASSOCIATED)' or not re.match(r'^[0-9A-F:]{17}$', bssid):
bssid = None
# Get probed ESSIDs (remaining columns)
probed_idx = col_map.get('probed', 6)
probed_essids = []
if probed_idx < len(parts):
for essid in parts[probed_idx:]:
essid = essid.strip()
if essid and essid not in probed_essids:
probed_essids.append(essid)
clients.append({
'mac': mac,
'rssi': rssi,
'packets': packets,
'bssid': bssid,
'probed_essids': probed_essids,
})
except Exception as e:
logger.debug(f"Error parsing client line: {line!r} - {e}")
return clients
def _parse_airodump_security(privacy: str) -> str:
"""Parse airodump privacy field to security type."""
privacy = privacy.upper()
if not privacy or privacy in ('', 'OPN', 'OPEN'):
return SECURITY_OPEN
elif 'WPA3' in privacy:
return SECURITY_WPA3
elif 'WPA2' in privacy and 'WPA' in privacy:
return SECURITY_WPA_WPA2
elif 'WPA2' in privacy:
return SECURITY_WPA2
elif 'WPA' in privacy:
return SECURITY_WPA
elif 'WEP' in privacy:
return SECURITY_WEP
return SECURITY_UNKNOWN
def _parse_airodump_cipher(cipher: str) -> str:
"""Parse airodump cipher field."""
cipher = cipher.upper()
if 'CCMP' in cipher:
return CIPHER_CCMP
elif 'TKIP' in cipher:
return CIPHER_TKIP
elif 'WEP' in cipher:
return CIPHER_WEP
return CIPHER_UNKNOWN
def _parse_airodump_auth(auth: str) -> str:
"""Parse airodump authentication field."""
auth = auth.upper()
if 'SAE' in auth:
return AUTH_SAE
elif 'PSK' in auth:
return AUTH_PSK
elif 'MGT' in auth or 'EAP' in auth or '802.1X' in auth:
return AUTH_EAP
elif 'OWE' in auth:
return AUTH_OWE
elif 'OPN' in auth or 'OPEN' in auth:
return AUTH_OPEN
return AUTH_UNKNOWN

View File

@@ -0,0 +1,207 @@
"""
Parser for macOS airport utility output.
Example output from 'airport -s':
SSID BSSID RSSI CHANNEL HT CC SECURITY
MyWiFi 00:11:22:33:44:55 -65 6 Y US WPA2(PSK/AES/AES)
Hidden -- 00:11:22:33:44:66 -70 11 Y US WPA2(PSK/AES/AES)
"""
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Optional
from ..models import WiFiObservation
from ..constants import (
SECURITY_OPEN,
SECURITY_WEP,
SECURITY_WPA,
SECURITY_WPA2,
SECURITY_WPA3,
SECURITY_WPA_WPA2,
SECURITY_WPA2_WPA3,
SECURITY_UNKNOWN,
CIPHER_CCMP,
CIPHER_TKIP,
CIPHER_WEP,
CIPHER_NONE,
CIPHER_UNKNOWN,
AUTH_PSK,
AUTH_SAE,
AUTH_EAP,
AUTH_OPEN,
AUTH_UNKNOWN,
WIDTH_20_MHZ,
WIDTH_40_MHZ,
get_band_from_channel,
CHANNEL_FREQUENCIES,
)
logger = logging.getLogger(__name__)
def parse_airport_scan(output: str) -> list[WiFiObservation]:
"""
Parse macOS airport scan output.
Args:
output: Raw output from 'airport -s' command.
Returns:
List of WiFiObservation objects.
"""
observations = []
lines = output.strip().split('\n')
if len(lines) < 2:
return observations
# Skip header line
for line in lines[1:]:
obs = _parse_airport_line(line)
if obs:
observations.append(obs)
return observations
def _parse_airport_line(line: str) -> Optional[WiFiObservation]:
"""Parse a single line of airport output."""
# airport output is space-aligned, need careful parsing
# Format: SSID (variable width) BSSID RSSI CHANNEL HT CC SECURITY
#
# The tricky part is SSID can contain spaces and the columns are
# aligned by whitespace. We parse from the right side.
line = line.rstrip()
if not line:
return None
try:
# Split into parts, but we need to handle SSID which may have spaces
# BSSID is always 17 chars (xx:xx:xx:xx:xx:xx)
# Find BSSID using regex
bssid_match = re.search(r'([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}', line)
if not bssid_match:
return None
bssid = bssid_match.group(0).upper()
bssid_pos = bssid_match.start()
# SSID is everything before BSSID (stripped)
ssid = line[:bssid_pos].strip()
# Handle hidden network indicator
is_hidden = False
if ssid == '--' or not ssid:
ssid = None
is_hidden = True
# Parse remainder after BSSID
remainder = line[bssid_match.end():].strip()
parts = remainder.split()
if len(parts) < 4:
# Minimal: RSSI CHANNEL HT SECURITY
return None
# Parse RSSI (negative number)
rssi_str = parts[0]
rssi = int(rssi_str) if rssi_str.lstrip('-').isdigit() else None
# Parse channel - might include +1 or -1 for 40MHz
channel_str = parts[1]
channel_match = re.match(r'(\d+)', channel_str)
channel = int(channel_match.group(1)) if channel_match else None
# Determine width from channel string
width = WIDTH_20_MHZ
if '+' in channel_str or '-' in channel_str:
width = WIDTH_40_MHZ
# HT flag (Y/N) at parts[2]
# CC (country code) at parts[3]
# Security is the rest (might have multiple parts like WPA2(PSK/AES/AES))
security_str = ' '.join(parts[4:]) if len(parts) > 4 else ''
security, cipher, auth = _parse_airport_security(security_str)
# Get frequency
frequency_mhz = CHANNEL_FREQUENCIES.get(channel) if channel else None
return WiFiObservation(
timestamp=datetime.now(),
bssid=bssid,
essid=ssid,
channel=channel,
frequency_mhz=frequency_mhz,
rssi=rssi,
security=security,
cipher=cipher,
auth=auth,
width=width,
)
except Exception as e:
logger.debug(f"Failed to parse airport line: {line!r} - {e}")
return None
def _parse_airport_security(security_str: str) -> tuple[str, str, str]:
"""
Parse airport security string.
Examples:
'WPA2(PSK/AES/AES)' -> (WPA2, CCMP, PSK)
'WPA(PSK/TKIP/TKIP)' -> (WPA, TKIP, PSK)
'WPA2(PSK,FT-PSK/AES/AES)' -> (WPA2, CCMP, PSK)
'RSN(PSK/AES,TKIP/TKIP)' -> (WPA2, CCMP, PSK)
'WEP' -> (WEP, WEP, OPEN)
'NONE' or '' -> (Open, None, Open)
"""
if not security_str or security_str.upper() == 'NONE':
return SECURITY_OPEN, CIPHER_NONE, AUTH_OPEN
security_upper = security_str.upper()
# Determine security type
security = SECURITY_UNKNOWN
if 'WPA3' in security_upper or 'SAE' in security_upper:
security = SECURITY_WPA3
elif 'RSN' in security_upper or 'WPA2' in security_upper:
security = SECURITY_WPA2
elif 'WPA' in security_upper:
security = SECURITY_WPA
elif 'WEP' in security_upper:
security = SECURITY_WEP
# Handle mixed mode
if 'WPA2' in security_upper and 'WPA3' in security_upper:
security = SECURITY_WPA2_WPA3
elif 'WPA' in security_upper and 'WPA2' in security_upper:
security = SECURITY_WPA_WPA2
# Determine cipher
cipher = CIPHER_UNKNOWN
if 'AES' in security_upper or 'CCMP' in security_upper:
cipher = CIPHER_CCMP
elif 'TKIP' in security_upper:
cipher = CIPHER_TKIP
elif 'WEP' in security_upper:
cipher = CIPHER_WEP
# Determine auth
auth = AUTH_UNKNOWN
if 'SAE' in security_upper:
auth = AUTH_SAE
elif 'PSK' in security_upper:
auth = AUTH_PSK
elif 'EAP' in security_upper or '802.1X' in security_upper:
auth = AUTH_EAP
elif security == SECURITY_OPEN:
auth = AUTH_OPEN
return security, cipher, auth

233
utils/wifi/parsers/iw.py Normal file
View File

@@ -0,0 +1,233 @@
"""
Parser for Linux iw scan output.
Example output from 'iw dev wlan0 scan':
BSS 00:11:22:33:44:55(on wlan0)
TSF: 12345678901234 usec (0d, 03:25:45)
freq: 2437
beacon interval: 100 TUs
capability: ESS Privacy ShortSlotTime (0x0411)
signal: -65.00 dBm
last seen: 100 ms ago
SSID: MyWiFi
Supported rates: 1.0* 2.0* 5.5* 11.0* 6.0 9.0 12.0 18.0
DS Parameter set: channel 6
RSN: * Version: 1
* Group cipher: CCMP
* Pairwise ciphers: CCMP
* Authentication suites: PSK
* Capabilities: 16-PTKSA-RC 1-GTKSA-RC (0x000c)
"""
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Optional
from ..models import WiFiObservation
from ..constants import (
SECURITY_OPEN,
SECURITY_WEP,
SECURITY_WPA,
SECURITY_WPA2,
SECURITY_WPA3,
SECURITY_WPA_WPA2,
SECURITY_WPA2_WPA3,
SECURITY_UNKNOWN,
CIPHER_CCMP,
CIPHER_TKIP,
CIPHER_GCMP,
CIPHER_WEP,
CIPHER_UNKNOWN,
AUTH_PSK,
AUTH_SAE,
AUTH_EAP,
AUTH_OWE,
AUTH_OPEN,
AUTH_UNKNOWN,
WIDTH_20_MHZ,
WIDTH_40_MHZ,
WIDTH_80_MHZ,
WIDTH_160_MHZ,
get_channel_from_frequency,
)
logger = logging.getLogger(__name__)
def parse_iw_scan(output: str) -> list[WiFiObservation]:
"""
Parse iw scan output.
Args:
output: Raw output from 'iw dev <interface> scan'.
Returns:
List of WiFiObservation objects.
"""
observations = []
current_block = []
for line in output.split('\n'):
if line.startswith('BSS '):
# Start of new BSS entry
if current_block:
obs = _parse_iw_block(current_block)
if obs:
observations.append(obs)
current_block = [line]
elif current_block:
current_block.append(line)
# Parse last block
if current_block:
obs = _parse_iw_block(current_block)
if obs:
observations.append(obs)
return observations
def _parse_iw_block(lines: list[str]) -> Optional[WiFiObservation]:
"""Parse a single BSS block from iw output."""
try:
# First line: BSS 00:11:22:33:44:55(on wlan0) -- associated
first_line = lines[0]
bssid_match = re.match(r'BSS ([0-9a-fA-F:]{17})', first_line)
if not bssid_match:
return None
bssid = bssid_match.group(1).upper()
# Parse remaining fields
ssid = None
frequency_mhz = None
channel = None
rssi = None
width = WIDTH_20_MHZ
has_privacy = False
has_rsn = False
has_wpa = False
cipher = CIPHER_UNKNOWN
auth = AUTH_UNKNOWN
i = 1
while i < len(lines):
line = lines[i].strip()
if line.startswith('freq:'):
freq_match = re.search(r'freq:\s*(\d+)', line)
if freq_match:
frequency_mhz = int(freq_match.group(1))
channel = get_channel_from_frequency(frequency_mhz)
elif line.startswith('signal:'):
signal_match = re.search(r'signal:\s*(-?\d+\.?\d*)', line)
if signal_match:
rssi = int(float(signal_match.group(1)))
elif line.startswith('SSID:'):
ssid_match = re.match(r'SSID:\s*(.*)', line)
if ssid_match:
ssid = ssid_match.group(1).strip()
if not ssid or ssid == '\\x00' * len(ssid):
ssid = None
elif line.startswith('DS Parameter set:'):
chan_match = re.search(r'channel\s*(\d+)', line)
if chan_match:
channel = int(chan_match.group(1))
elif line.startswith('capability:'):
if 'Privacy' in line:
has_privacy = True
elif line.startswith('RSN:') or line.startswith('WPA:'):
is_rsn = line.startswith('RSN:')
if is_rsn:
has_rsn = True
else:
has_wpa = True
# Parse the RSN/WPA block
i += 1
while i < len(lines) and lines[i].startswith('\t\t'):
subline = lines[i].strip()
if 'Group cipher:' in subline or 'Pairwise ciphers:' in subline:
if 'CCMP' in subline:
cipher = CIPHER_CCMP
elif 'TKIP' in subline:
cipher = CIPHER_TKIP
elif 'GCMP' in subline:
cipher = CIPHER_GCMP
elif 'Authentication suites:' in subline:
if 'SAE' in subline:
auth = AUTH_SAE
elif 'PSK' in subline:
auth = AUTH_PSK
elif 'IEEE 802.1X' in subline or 'EAP' in subline:
auth = AUTH_EAP
elif 'OWE' in subline:
auth = AUTH_OWE
i += 1
continue
elif 'HT operation:' in line or 'VHT operation:' in line or 'HE operation:' in line:
# Parse width from subsequent lines
i += 1
while i < len(lines) and lines[i].startswith('\t\t'):
subline = lines[i].strip()
if 'channel width:' in subline.lower():
if '160' in subline:
width = WIDTH_160_MHZ
elif '80' in subline:
width = WIDTH_80_MHZ
elif '40' in subline:
width = WIDTH_40_MHZ
i += 1
continue
i += 1
# Determine security type
security = SECURITY_OPEN
if has_rsn and has_wpa:
security = SECURITY_WPA_WPA2
elif has_rsn:
if auth == AUTH_SAE:
security = SECURITY_WPA3
else:
security = SECURITY_WPA2
elif has_wpa:
security = SECURITY_WPA
elif has_privacy:
security = SECURITY_WEP
cipher = CIPHER_WEP
if auth == AUTH_UNKNOWN:
if security == SECURITY_OPEN:
auth = AUTH_OPEN
elif security in (SECURITY_WPA, SECURITY_WPA2, SECURITY_WPA_WPA2):
auth = AUTH_PSK
return WiFiObservation(
timestamp=datetime.now(),
bssid=bssid,
essid=ssid,
channel=channel,
frequency_mhz=frequency_mhz,
rssi=rssi,
security=security,
cipher=cipher,
auth=auth,
width=width,
)
except Exception as e:
logger.debug(f"Failed to parse iw block: {e}")
return None

View File

@@ -0,0 +1,209 @@
"""
Parser for Linux iwlist scan output.
Example output from 'iwlist wlan0 scan':
wlan0 Scan completed :
Cell 01 - Address: 00:11:22:33:44:55
Channel:6
Frequency:2.437 GHz (Channel 6)
Quality=70/70 Signal level=-40 dBm
Encryption key:on
ESSID:"MyWiFi"
Bit Rates:54 Mb/s
Mode:Master
Extra:tsf=0000000000000000
Extra: Last beacon: 100ms ago
IE: Unknown: 000A4D79576946695F4E6574
IE: IEEE 802.11i/WPA2 Version 1
Group Cipher : CCMP
Pairwise Ciphers (1) : CCMP
Authentication Suites (1) : PSK
"""
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Optional
from ..models import WiFiObservation
from ..constants import (
SECURITY_OPEN,
SECURITY_WEP,
SECURITY_WPA,
SECURITY_WPA2,
SECURITY_WPA_WPA2,
SECURITY_UNKNOWN,
CIPHER_CCMP,
CIPHER_TKIP,
CIPHER_WEP,
CIPHER_UNKNOWN,
AUTH_PSK,
AUTH_EAP,
AUTH_OPEN,
AUTH_UNKNOWN,
get_channel_from_frequency,
CHANNEL_FREQUENCIES,
)
logger = logging.getLogger(__name__)
def parse_iwlist_scan(output: str) -> list[WiFiObservation]:
"""
Parse iwlist scan output.
Args:
output: Raw output from 'iwlist <interface> scan'.
Returns:
List of WiFiObservation objects.
"""
observations = []
current_block = []
for line in output.split('\n'):
# New cell starts with "Cell XX - Address:"
if re.match(r'\s*Cell \d+ - Address:', line):
if current_block:
obs = _parse_iwlist_block(current_block)
if obs:
observations.append(obs)
current_block = [line]
elif current_block:
current_block.append(line)
# Parse last block
if current_block:
obs = _parse_iwlist_block(current_block)
if obs:
observations.append(obs)
return observations
def _parse_iwlist_block(lines: list[str]) -> Optional[WiFiObservation]:
"""Parse a single Cell block from iwlist output."""
try:
# Extract BSSID from first line
first_line = lines[0]
bssid_match = re.search(r'Address:\s*([0-9A-Fa-f:]{17})', first_line)
if not bssid_match:
return None
bssid = bssid_match.group(1).upper()
# Parse remaining fields
ssid = None
frequency_mhz = None
channel = None
rssi = None
has_encryption = False
has_wpa = False
has_wpa2 = False
cipher = CIPHER_UNKNOWN
auth = AUTH_UNKNOWN
for line in lines[1:]:
line = line.strip()
# Channel
if line.startswith('Channel:'):
chan_match = re.search(r'Channel:(\d+)', line)
if chan_match:
channel = int(chan_match.group(1))
# Frequency
elif line.startswith('Frequency:'):
# Format: "Frequency:2.437 GHz (Channel 6)"
freq_match = re.search(r'Frequency:(\d+\.?\d*)\s*GHz', line)
if freq_match:
frequency_ghz = float(freq_match.group(1))
frequency_mhz = int(frequency_ghz * 1000)
# Also try to get channel from this line
chan_match = re.search(r'\(Channel (\d+)\)', line)
if chan_match and not channel:
channel = int(chan_match.group(1))
# Signal level
elif 'Signal level' in line:
# Format: "Quality=70/70 Signal level=-40 dBm"
signal_match = re.search(r'Signal level[=:]?\s*(-?\d+)', line)
if signal_match:
rssi = int(signal_match.group(1))
# ESSID
elif line.startswith('ESSID:'):
ssid_match = re.search(r'ESSID:"([^"]*)"', line)
if ssid_match:
ssid = ssid_match.group(1)
if not ssid:
ssid = None
# Encryption
elif line.startswith('Encryption key:'):
has_encryption = 'on' in line.lower()
# WPA/WPA2 IE
elif 'WPA2' in line or 'IEEE 802.11i' in line:
has_wpa2 = True
elif 'WPA Version' in line:
has_wpa = True
# Cipher
elif 'Group Cipher' in line or 'Pairwise Ciphers' in line:
if 'CCMP' in line:
cipher = CIPHER_CCMP
elif 'TKIP' in line:
cipher = CIPHER_TKIP
# Auth
elif 'Authentication Suites' in line:
if 'PSK' in line:
auth = AUTH_PSK
elif '802.1x' in line.lower() or 'EAP' in line:
auth = AUTH_EAP
# Derive channel from frequency if needed
if not channel and frequency_mhz:
channel = get_channel_from_frequency(frequency_mhz)
# Get frequency from channel if needed
if not frequency_mhz and channel:
frequency_mhz = CHANNEL_FREQUENCIES.get(channel)
# Determine security type
security = SECURITY_OPEN
if has_wpa2 and has_wpa:
security = SECURITY_WPA_WPA2
elif has_wpa2:
security = SECURITY_WPA2
elif has_wpa:
security = SECURITY_WPA
elif has_encryption:
security = SECURITY_WEP
cipher = CIPHER_WEP
if auth == AUTH_UNKNOWN:
if security == SECURITY_OPEN:
auth = AUTH_OPEN
elif security != SECURITY_WEP:
auth = AUTH_PSK
return WiFiObservation(
timestamp=datetime.now(),
bssid=bssid,
essid=ssid,
channel=channel,
frequency_mhz=frequency_mhz,
rssi=rssi,
security=security,
cipher=cipher,
auth=auth,
)
except Exception as e:
logger.debug(f"Failed to parse iwlist block: {e}")
return None

205
utils/wifi/parsers/nmcli.py Normal file
View File

@@ -0,0 +1,205 @@
r"""
Parser for NetworkManager nmcli output.
Example output from 'nmcli -t -f BSSID,SSID,MODE,CHAN,FREQ,RATE,SIGNAL,SECURITY device wifi list':
00\:11\:22\:33\:44\:55:MyWiFi:Infra:6:2437 MHz:130 Mbit/s:75:WPA2
00\:11\:22\:33\:44\:66::Infra:11:2462 MHz:54 Mbit/s:60:WPA2
"""
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Optional
from ..models import WiFiObservation
from ..constants import (
SECURITY_OPEN,
SECURITY_WEP,
SECURITY_WPA,
SECURITY_WPA2,
SECURITY_WPA3,
SECURITY_WPA_WPA2,
SECURITY_WPA2_WPA3,
SECURITY_ENTERPRISE,
SECURITY_UNKNOWN,
CIPHER_CCMP,
CIPHER_TKIP,
CIPHER_UNKNOWN,
AUTH_PSK,
AUTH_SAE,
AUTH_EAP,
AUTH_OPEN,
AUTH_UNKNOWN,
get_channel_from_frequency,
get_band_from_frequency,
)
logger = logging.getLogger(__name__)
def parse_nmcli_scan(output: str) -> list[WiFiObservation]:
"""
Parse nmcli terse output.
Args:
output: Raw output from nmcli with -t flag.
Returns:
List of WiFiObservation objects.
"""
observations = []
for line in output.strip().split('\n'):
if not line:
continue
obs = _parse_nmcli_line(line)
if obs:
observations.append(obs)
return observations
def _parse_nmcli_line(line: str) -> Optional[WiFiObservation]:
"""Parse a single line of nmcli terse output."""
try:
# nmcli terse format uses : as delimiter but escapes colons in values with \:
# Need to carefully split
parts = _split_nmcli_line(line)
if len(parts) < 8:
return None
# BSSID,SSID,MODE,CHAN,FREQ,RATE,SIGNAL,SECURITY
bssid = parts[0].upper()
ssid = parts[1] if parts[1] else None
# mode = parts[2] # 'Infra' or 'Ad-Hoc'
channel_str = parts[3]
freq_str = parts[4]
# rate_str = parts[5] # e.g., '130 Mbit/s'
signal_str = parts[6]
security_str = parts[7] if len(parts) > 7 else ''
# Parse channel
channel = int(channel_str) if channel_str.isdigit() else None
# Parse frequency (e.g., "2437 MHz")
freq_match = re.match(r'(\d+)', freq_str)
frequency_mhz = int(freq_match.group(1)) if freq_match else None
# If no channel, derive from frequency
if not channel and frequency_mhz:
channel = get_channel_from_frequency(frequency_mhz)
# Parse signal strength (nmcli gives percentage 0-100)
# Convert to approximate dBm: -100 + (signal * 0.5)
# More accurate: signal 100 = -30 dBm, signal 0 = -100 dBm
rssi = None
if signal_str.isdigit():
signal_pct = int(signal_str)
rssi = int(-100 + (signal_pct * 0.7)) # Rough conversion
# Parse security
security, cipher, auth = _parse_nmcli_security(security_str)
return WiFiObservation(
timestamp=datetime.now(),
bssid=bssid,
essid=ssid,
channel=channel,
frequency_mhz=frequency_mhz,
rssi=rssi,
security=security,
cipher=cipher,
auth=auth,
)
except Exception as e:
logger.debug(f"Failed to parse nmcli line: {line!r} - {e}")
return None
def _split_nmcli_line(line: str) -> list[str]:
"""Split nmcli terse line handling escaped colons."""
parts = []
current = []
i = 0
while i < len(line):
if line[i] == '\\' and i + 1 < len(line) and line[i + 1] == ':':
# Escaped colon - add literal colon
current.append(':')
i += 2
elif line[i] == ':':
# Field delimiter
parts.append(''.join(current))
current = []
i += 1
else:
current.append(line[i])
i += 1
# Add last field
parts.append(''.join(current))
return parts
def _parse_nmcli_security(security_str: str) -> tuple[str, str, str]:
"""
Parse nmcli security string.
Examples:
'WPA2' -> (WPA2, CCMP, PSK)
'WPA1 WPA2' -> (WPA/WPA2, CCMP, PSK)
'WPA3' -> (WPA3, CCMP, SAE)
'802.1X' -> (Enterprise, CCMP, EAP)
'WEP' -> (WEP, WEP, OPEN)
'' or '--' -> (Open, None, Open)
"""
if not security_str or security_str == '--':
return SECURITY_OPEN, CIPHER_UNKNOWN, AUTH_OPEN
security_upper = security_str.upper()
# Determine security type
security = SECURITY_UNKNOWN
if '802.1X' in security_upper:
security = SECURITY_ENTERPRISE
elif 'WPA3' in security_upper:
if 'WPA2' in security_upper:
security = SECURITY_WPA2_WPA3
else:
security = SECURITY_WPA3
elif 'WPA2' in security_upper:
if 'WPA1' in security_upper or security_upper.count('WPA') > 1:
security = SECURITY_WPA_WPA2
else:
security = SECURITY_WPA2
elif 'WPA' in security_upper:
security = SECURITY_WPA
elif 'WEP' in security_upper:
security = SECURITY_WEP
# Determine cipher (assume CCMP for WPA2+)
cipher = CIPHER_UNKNOWN
if security in (SECURITY_WPA2, SECURITY_WPA3, SECURITY_WPA2_WPA3, SECURITY_ENTERPRISE):
cipher = CIPHER_CCMP
elif security == SECURITY_WPA or security == SECURITY_WPA_WPA2:
cipher = CIPHER_TKIP # Often TKIP for mixed mode
# Determine auth
auth = AUTH_UNKNOWN
if security == SECURITY_ENTERPRISE or '802.1X' in security_upper:
auth = AUTH_EAP
elif security == SECURITY_WPA3:
auth = AUTH_SAE
elif security in (SECURITY_WPA, SECURITY_WPA2, SECURITY_WPA_WPA2, SECURITY_WPA2_WPA3):
auth = AUTH_PSK
elif security == SECURITY_OPEN:
auth = AUTH_OPEN
return security, cipher, auth

1049
utils/wifi/scanner.py Normal file

File diff suppressed because it is too large Load Diff