Add WiFi v2 API endpoints for dual-mode scanning

- Add v2 capabilities, quick scan, deep scan, and status endpoints
- Add v2 networks, clients, probes, and channels endpoints
- Add v2 SSE stream, export (CSV/JSON), and baseline management
- Add recommendation_rank field to ChannelRecommendation model

The frontend was already wired up to call these v2 endpoints but they
were missing from the backend. This completes the WiFi module v2 API.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-21 22:53:02 +00:00
parent 45c10a8593
commit dfd4b0e89e
2 changed files with 317 additions and 0 deletions

View File

@@ -1098,3 +1098,318 @@ def stream_wifi():
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# =============================================================================
# V2 API Endpoints - Using unified WiFi scanner
# =============================================================================
from utils.wifi.scanner import get_wifi_scanner, reset_wifi_scanner
@wifi_bp.route('/v2/capabilities')
def get_v2_capabilities():
"""Get WiFi scanning capabilities on this system."""
try:
scanner = get_wifi_scanner()
caps = scanner.check_capabilities()
return jsonify({
'platform': caps.platform,
'is_root': caps.is_root,
'can_quick_scan': caps.can_quick_scan,
'can_deep_scan': caps.can_deep_scan,
'preferred_quick_tool': caps.preferred_quick_tool,
'interfaces': caps.interfaces,
'default_interface': caps.default_interface,
'has_monitor_capable_interface': caps.has_monitor_capable_interface,
'monitor_interface': caps.monitor_interface,
'issues': caps.issues,
'tools': {
'nmcli': caps.has_nmcli,
'iw': caps.has_iw,
'iwlist': caps.has_iwlist,
'airport': caps.has_airport,
'airmon_ng': caps.has_airmon_ng,
'airodump_ng': caps.has_airodump_ng,
},
})
except Exception as e:
logger.exception("Error checking capabilities")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/scan/quick', methods=['POST'])
def v2_quick_scan():
"""Perform a quick one-shot WiFi scan using system tools."""
try:
data = request.json or {}
interface = data.get('interface')
timeout = data.get('timeout', 10.0)
scanner = get_wifi_scanner()
result = scanner.quick_scan(interface=interface, timeout=timeout)
if result.error:
return jsonify({
'error': result.error,
'access_points': [],
'channel_stats': [],
'recommendations': [],
}), 200 # Return 200 with error in body for cleaner handling
return jsonify({
'access_points': [ap.to_summary_dict() for ap in result.access_points],
'channel_stats': [s.to_dict() for s in result.channel_stats],
'recommendations': [r.to_dict() for r in result.recommendations],
'duration_seconds': result.duration_seconds,
'warnings': result.warnings,
})
except Exception as e:
logger.exception("Error in quick scan")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/scan/start', methods=['POST'])
def v2_start_scan():
"""Start continuous deep scan with airodump-ng."""
try:
data = request.json or {}
interface = data.get('interface')
band = data.get('band', 'all')
channel = data.get('channel')
scanner = get_wifi_scanner()
success = scanner.start_deep_scan(interface=interface, band=band, channel=channel)
if success:
return jsonify({'status': 'started'})
else:
status = scanner.get_status()
return jsonify({'error': status.error or 'Failed to start scan'}), 400
except Exception as e:
logger.exception("Error starting deep scan")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/scan/stop', methods=['POST'])
def v2_stop_scan():
"""Stop the current scan."""
try:
scanner = get_wifi_scanner()
scanner.stop_deep_scan()
return jsonify({'status': 'stopped'})
except Exception as e:
logger.exception("Error stopping scan")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/scan/status')
def v2_scan_status():
"""Get current scan status."""
try:
scanner = get_wifi_scanner()
status = scanner.get_status()
return jsonify({
'is_scanning': status.is_scanning,
'scan_mode': status.scan_mode,
'interface': status.interface,
'started_at': status.started_at.isoformat() if status.started_at else None,
'networks_found': status.networks_found,
'clients_found': status.clients_found,
'error': status.error,
})
except Exception as e:
logger.exception("Error getting scan status")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/networks')
def v2_get_networks():
"""Get all discovered networks."""
try:
scanner = get_wifi_scanner()
networks = scanner.access_points
return jsonify({
'networks': [ap.to_summary_dict() for ap in networks],
'total': len(networks),
})
except Exception as e:
logger.exception("Error getting networks")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/clients')
def v2_get_clients():
"""Get all discovered clients."""
try:
scanner = get_wifi_scanner()
clients = scanner.clients
return jsonify({
'clients': [c.to_dict() for c in clients],
'total': len(clients),
})
except Exception as e:
logger.exception("Error getting clients")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/probes')
def v2_get_probes():
"""Get probe requests."""
try:
scanner = get_wifi_scanner()
probes = scanner.probe_requests
return jsonify({
'probes': [p.to_dict() for p in probes[-100:]], # Last 100
'total': len(probes),
})
except Exception as e:
logger.exception("Error getting probes")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/channels')
def v2_get_channels():
"""Get channel statistics and recommendations."""
try:
scanner = get_wifi_scanner()
stats = scanner._calculate_channel_stats()
recommendations = scanner._generate_recommendations(stats)
return jsonify({
'channel_stats': [s.to_dict() for s in stats],
'recommendations': [r.to_dict() for r in recommendations],
})
except Exception as e:
logger.exception("Error getting channel stats")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/stream')
def v2_stream():
"""SSE stream for real-time WiFi events."""
def generate():
scanner = get_wifi_scanner()
for event in scanner.get_event_stream():
yield format_sse(event)
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
@wifi_bp.route('/v2/export')
def v2_export():
"""Export scan data as CSV or JSON."""
try:
format_type = request.args.get('format', 'json')
data_type = request.args.get('type', 'all')
scanner = get_wifi_scanner()
if format_type == 'json':
data = {}
if data_type in ('all', 'networks'):
data['networks'] = [ap.to_summary_dict() for ap in scanner.access_points]
if data_type in ('all', 'clients'):
data['clients'] = [c.to_dict() for c in scanner.clients]
if data_type in ('all', 'probes'):
data['probes'] = [p.to_dict() for p in scanner.probe_requests]
response = Response(
json.dumps(data, indent=2, default=str),
mimetype='application/json',
)
response.headers['Content-Disposition'] = 'attachment; filename=wifi_scan.json'
return response
elif format_type == 'csv':
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# Write networks
writer.writerow(['Networks'])
writer.writerow(['BSSID', 'ESSID', 'Channel', 'Band', 'RSSI', 'Security', 'Vendor', 'Clients', 'First Seen', 'Last Seen'])
for ap in scanner.access_points:
writer.writerow([
ap.bssid,
ap.essid or '[Hidden]',
ap.channel,
ap.band,
ap.rssi_current,
ap.security,
ap.vendor,
ap.client_count,
ap.first_seen.isoformat() if ap.first_seen else '',
ap.last_seen.isoformat() if ap.last_seen else '',
])
writer.writerow([])
# Write clients
writer.writerow(['Clients'])
writer.writerow(['MAC', 'BSSID', 'Vendor', 'RSSI', 'Probed SSIDs', 'First Seen', 'Last Seen'])
for c in scanner.clients:
writer.writerow([
c.mac,
c.associated_bssid or '',
c.vendor,
c.rssi_current,
', '.join(c.probed_ssids),
c.first_seen.isoformat() if c.first_seen else '',
c.last_seen.isoformat() if c.last_seen else '',
])
response = Response(
output.getvalue(),
mimetype='text/csv',
)
response.headers['Content-Disposition'] = 'attachment; filename=wifi_scan.csv'
return response
else:
return jsonify({'error': f'Unknown format: {format_type}'}), 400
except Exception as e:
logger.exception("Error exporting data")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/baseline/set', methods=['POST'])
def v2_set_baseline():
"""Set current networks as baseline."""
try:
scanner = get_wifi_scanner()
scanner.set_baseline()
return jsonify({'status': 'baseline_set', 'count': len(scanner._baseline_networks)})
except Exception as e:
logger.exception("Error setting baseline")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/baseline/clear', methods=['POST'])
def v2_clear_baseline():
"""Clear the baseline."""
try:
scanner = get_wifi_scanner()
scanner.clear_baseline()
return jsonify({'status': 'baseline_cleared'})
except Exception as e:
logger.exception("Error clearing baseline")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/clear', methods=['POST'])
def v2_clear_data():
"""Clear all discovered data."""
try:
scanner = get_wifi_scanner()
scanner.clear_data()
return jsonify({'status': 'cleared'})
except Exception as e:
logger.exception("Error clearing data")
return jsonify({'error': str(e)}), 500

View File

@@ -441,6 +441,7 @@ class ChannelRecommendation:
score: float # Lower is better
reason: str
is_dfs: bool = False
recommendation_rank: Optional[int] = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
@@ -450,6 +451,7 @@ class ChannelRecommendation:
'score': round(self.score, 3),
'reason': self.reason,
'is_dfs': self.is_dfs,
'rank': self.recommendation_rank,
}