From dfd4b0e89eebfa923bd88ca096fe516f950c0e56 Mon Sep 17 00:00:00 2001 From: Smittix Date: Wed, 21 Jan 2026 22:53:02 +0000 Subject: [PATCH] Add WiFi v2 API endpoints for dual-mode scanning - Add v2 capabilities, quick scan, deep scan, and status endpoints - Add v2 networks, clients, probes, and channels endpoints - Add v2 SSE stream, export (CSV/JSON), and baseline management - Add recommendation_rank field to ChannelRecommendation model The frontend was already wired up to call these v2 endpoints but they were missing from the backend. This completes the WiFi module v2 API. Co-Authored-By: Claude Opus 4.5 --- routes/wifi.py | 315 +++++++++++++++++++++++++++++++++++++++++++ utils/wifi/models.py | 2 + 2 files changed, 317 insertions(+) diff --git a/routes/wifi.py b/routes/wifi.py index 0bd4a42..5b833cc 100644 --- a/routes/wifi.py +++ b/routes/wifi.py @@ -1098,3 +1098,318 @@ def stream_wifi(): response.headers['X-Accel-Buffering'] = 'no' response.headers['Connection'] = 'keep-alive' return response + + +# ============================================================================= +# V2 API Endpoints - Using unified WiFi scanner +# ============================================================================= + +from utils.wifi.scanner import get_wifi_scanner, reset_wifi_scanner + + +@wifi_bp.route('/v2/capabilities') +def get_v2_capabilities(): + """Get WiFi scanning capabilities on this system.""" + try: + scanner = get_wifi_scanner() + caps = scanner.check_capabilities() + return jsonify({ + 'platform': caps.platform, + 'is_root': caps.is_root, + 'can_quick_scan': caps.can_quick_scan, + 'can_deep_scan': caps.can_deep_scan, + 'preferred_quick_tool': caps.preferred_quick_tool, + 'interfaces': caps.interfaces, + 'default_interface': caps.default_interface, + 'has_monitor_capable_interface': caps.has_monitor_capable_interface, + 'monitor_interface': caps.monitor_interface, + 'issues': caps.issues, + 'tools': { + 'nmcli': caps.has_nmcli, + 'iw': caps.has_iw, + 'iwlist': caps.has_iwlist, + 'airport': caps.has_airport, + 'airmon_ng': caps.has_airmon_ng, + 'airodump_ng': caps.has_airodump_ng, + }, + }) + except Exception as e: + logger.exception("Error checking capabilities") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/quick', methods=['POST']) +def v2_quick_scan(): + """Perform a quick one-shot WiFi scan using system tools.""" + try: + data = request.json or {} + interface = data.get('interface') + timeout = data.get('timeout', 10.0) + + scanner = get_wifi_scanner() + result = scanner.quick_scan(interface=interface, timeout=timeout) + + if result.error: + return jsonify({ + 'error': result.error, + 'access_points': [], + 'channel_stats': [], + 'recommendations': [], + }), 200 # Return 200 with error in body for cleaner handling + + return jsonify({ + 'access_points': [ap.to_summary_dict() for ap in result.access_points], + 'channel_stats': [s.to_dict() for s in result.channel_stats], + 'recommendations': [r.to_dict() for r in result.recommendations], + 'duration_seconds': result.duration_seconds, + 'warnings': result.warnings, + }) + except Exception as e: + logger.exception("Error in quick scan") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/start', methods=['POST']) +def v2_start_scan(): + """Start continuous deep scan with airodump-ng.""" + try: + data = request.json or {} + interface = data.get('interface') + band = data.get('band', 'all') + channel = data.get('channel') + + scanner = get_wifi_scanner() + success = scanner.start_deep_scan(interface=interface, band=band, channel=channel) + + if success: + return jsonify({'status': 'started'}) + else: + status = scanner.get_status() + return jsonify({'error': status.error or 'Failed to start scan'}), 400 + except Exception as e: + logger.exception("Error starting deep scan") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/stop', methods=['POST']) +def v2_stop_scan(): + """Stop the current scan.""" + try: + scanner = get_wifi_scanner() + scanner.stop_deep_scan() + return jsonify({'status': 'stopped'}) + except Exception as e: + logger.exception("Error stopping scan") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/status') +def v2_scan_status(): + """Get current scan status.""" + try: + scanner = get_wifi_scanner() + status = scanner.get_status() + return jsonify({ + 'is_scanning': status.is_scanning, + 'scan_mode': status.scan_mode, + 'interface': status.interface, + 'started_at': status.started_at.isoformat() if status.started_at else None, + 'networks_found': status.networks_found, + 'clients_found': status.clients_found, + 'error': status.error, + }) + except Exception as e: + logger.exception("Error getting scan status") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/networks') +def v2_get_networks(): + """Get all discovered networks.""" + try: + scanner = get_wifi_scanner() + networks = scanner.access_points + return jsonify({ + 'networks': [ap.to_summary_dict() for ap in networks], + 'total': len(networks), + }) + except Exception as e: + logger.exception("Error getting networks") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/clients') +def v2_get_clients(): + """Get all discovered clients.""" + try: + scanner = get_wifi_scanner() + clients = scanner.clients + return jsonify({ + 'clients': [c.to_dict() for c in clients], + 'total': len(clients), + }) + except Exception as e: + logger.exception("Error getting clients") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/probes') +def v2_get_probes(): + """Get probe requests.""" + try: + scanner = get_wifi_scanner() + probes = scanner.probe_requests + return jsonify({ + 'probes': [p.to_dict() for p in probes[-100:]], # Last 100 + 'total': len(probes), + }) + except Exception as e: + logger.exception("Error getting probes") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/channels') +def v2_get_channels(): + """Get channel statistics and recommendations.""" + try: + scanner = get_wifi_scanner() + stats = scanner._calculate_channel_stats() + recommendations = scanner._generate_recommendations(stats) + return jsonify({ + 'channel_stats': [s.to_dict() for s in stats], + 'recommendations': [r.to_dict() for r in recommendations], + }) + except Exception as e: + logger.exception("Error getting channel stats") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/stream') +def v2_stream(): + """SSE stream for real-time WiFi events.""" + def generate(): + scanner = get_wifi_scanner() + for event in scanner.get_event_stream(): + yield format_sse(event) + + response = Response(generate(), mimetype='text/event-stream') + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + response.headers['Connection'] = 'keep-alive' + return response + + +@wifi_bp.route('/v2/export') +def v2_export(): + """Export scan data as CSV or JSON.""" + try: + format_type = request.args.get('format', 'json') + data_type = request.args.get('type', 'all') + + scanner = get_wifi_scanner() + + if format_type == 'json': + data = {} + if data_type in ('all', 'networks'): + data['networks'] = [ap.to_summary_dict() for ap in scanner.access_points] + if data_type in ('all', 'clients'): + data['clients'] = [c.to_dict() for c in scanner.clients] + if data_type in ('all', 'probes'): + data['probes'] = [p.to_dict() for p in scanner.probe_requests] + + response = Response( + json.dumps(data, indent=2, default=str), + mimetype='application/json', + ) + response.headers['Content-Disposition'] = 'attachment; filename=wifi_scan.json' + return response + + elif format_type == 'csv': + import csv + import io + + output = io.StringIO() + writer = csv.writer(output) + + # Write networks + writer.writerow(['Networks']) + writer.writerow(['BSSID', 'ESSID', 'Channel', 'Band', 'RSSI', 'Security', 'Vendor', 'Clients', 'First Seen', 'Last Seen']) + for ap in scanner.access_points: + writer.writerow([ + ap.bssid, + ap.essid or '[Hidden]', + ap.channel, + ap.band, + ap.rssi_current, + ap.security, + ap.vendor, + ap.client_count, + ap.first_seen.isoformat() if ap.first_seen else '', + ap.last_seen.isoformat() if ap.last_seen else '', + ]) + + writer.writerow([]) + + # Write clients + writer.writerow(['Clients']) + writer.writerow(['MAC', 'BSSID', 'Vendor', 'RSSI', 'Probed SSIDs', 'First Seen', 'Last Seen']) + for c in scanner.clients: + writer.writerow([ + c.mac, + c.associated_bssid or '', + c.vendor, + c.rssi_current, + ', '.join(c.probed_ssids), + c.first_seen.isoformat() if c.first_seen else '', + c.last_seen.isoformat() if c.last_seen else '', + ]) + + response = Response( + output.getvalue(), + mimetype='text/csv', + ) + response.headers['Content-Disposition'] = 'attachment; filename=wifi_scan.csv' + return response + + else: + return jsonify({'error': f'Unknown format: {format_type}'}), 400 + + except Exception as e: + logger.exception("Error exporting data") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/baseline/set', methods=['POST']) +def v2_set_baseline(): + """Set current networks as baseline.""" + try: + scanner = get_wifi_scanner() + scanner.set_baseline() + return jsonify({'status': 'baseline_set', 'count': len(scanner._baseline_networks)}) + except Exception as e: + logger.exception("Error setting baseline") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/baseline/clear', methods=['POST']) +def v2_clear_baseline(): + """Clear the baseline.""" + try: + scanner = get_wifi_scanner() + scanner.clear_baseline() + return jsonify({'status': 'baseline_cleared'}) + except Exception as e: + logger.exception("Error clearing baseline") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/clear', methods=['POST']) +def v2_clear_data(): + """Clear all discovered data.""" + try: + scanner = get_wifi_scanner() + scanner.clear_data() + return jsonify({'status': 'cleared'}) + except Exception as e: + logger.exception("Error clearing data") + return jsonify({'error': str(e)}), 500 diff --git a/utils/wifi/models.py b/utils/wifi/models.py index 4af8470..fa2f817 100644 --- a/utils/wifi/models.py +++ b/utils/wifi/models.py @@ -441,6 +441,7 @@ class ChannelRecommendation: score: float # Lower is better reason: str is_dfs: bool = False + recommendation_rank: Optional[int] = None def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" @@ -450,6 +451,7 @@ class ChannelRecommendation: 'score': round(self.score, 3), 'reason': self.reason, 'is_dfs': self.is_dfs, + 'rank': self.recommendation_rank, }