diff --git a/routes/wifi.py b/routes/wifi.py index 0bd4a42..5b833cc 100644 --- a/routes/wifi.py +++ b/routes/wifi.py @@ -1098,3 +1098,318 @@ def stream_wifi(): response.headers['X-Accel-Buffering'] = 'no' response.headers['Connection'] = 'keep-alive' return response + + +# ============================================================================= +# V2 API Endpoints - Using unified WiFi scanner +# ============================================================================= + +from utils.wifi.scanner import get_wifi_scanner, reset_wifi_scanner + + +@wifi_bp.route('/v2/capabilities') +def get_v2_capabilities(): + """Get WiFi scanning capabilities on this system.""" + try: + scanner = get_wifi_scanner() + caps = scanner.check_capabilities() + return jsonify({ + 'platform': caps.platform, + 'is_root': caps.is_root, + 'can_quick_scan': caps.can_quick_scan, + 'can_deep_scan': caps.can_deep_scan, + 'preferred_quick_tool': caps.preferred_quick_tool, + 'interfaces': caps.interfaces, + 'default_interface': caps.default_interface, + 'has_monitor_capable_interface': caps.has_monitor_capable_interface, + 'monitor_interface': caps.monitor_interface, + 'issues': caps.issues, + 'tools': { + 'nmcli': caps.has_nmcli, + 'iw': caps.has_iw, + 'iwlist': caps.has_iwlist, + 'airport': caps.has_airport, + 'airmon_ng': caps.has_airmon_ng, + 'airodump_ng': caps.has_airodump_ng, + }, + }) + except Exception as e: + logger.exception("Error checking capabilities") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/quick', methods=['POST']) +def v2_quick_scan(): + """Perform a quick one-shot WiFi scan using system tools.""" + try: + data = request.json or {} + interface = data.get('interface') + timeout = data.get('timeout', 10.0) + + scanner = get_wifi_scanner() + result = scanner.quick_scan(interface=interface, timeout=timeout) + + if result.error: + return jsonify({ + 'error': result.error, + 'access_points': [], + 'channel_stats': [], + 'recommendations': [], + }), 200 # Return 200 with error in body for cleaner handling + + return jsonify({ + 'access_points': [ap.to_summary_dict() for ap in result.access_points], + 'channel_stats': [s.to_dict() for s in result.channel_stats], + 'recommendations': [r.to_dict() for r in result.recommendations], + 'duration_seconds': result.duration_seconds, + 'warnings': result.warnings, + }) + except Exception as e: + logger.exception("Error in quick scan") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/start', methods=['POST']) +def v2_start_scan(): + """Start continuous deep scan with airodump-ng.""" + try: + data = request.json or {} + interface = data.get('interface') + band = data.get('band', 'all') + channel = data.get('channel') + + scanner = get_wifi_scanner() + success = scanner.start_deep_scan(interface=interface, band=band, channel=channel) + + if success: + return jsonify({'status': 'started'}) + else: + status = scanner.get_status() + return jsonify({'error': status.error or 'Failed to start scan'}), 400 + except Exception as e: + logger.exception("Error starting deep scan") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/stop', methods=['POST']) +def v2_stop_scan(): + """Stop the current scan.""" + try: + scanner = get_wifi_scanner() + scanner.stop_deep_scan() + return jsonify({'status': 'stopped'}) + except Exception as e: + logger.exception("Error stopping scan") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/scan/status') +def v2_scan_status(): + """Get current scan status.""" + try: + scanner = get_wifi_scanner() + status = scanner.get_status() + return jsonify({ + 'is_scanning': status.is_scanning, + 'scan_mode': status.scan_mode, + 'interface': status.interface, + 'started_at': status.started_at.isoformat() if status.started_at else None, + 'networks_found': status.networks_found, + 'clients_found': status.clients_found, + 'error': status.error, + }) + except Exception as e: + logger.exception("Error getting scan status") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/networks') +def v2_get_networks(): + """Get all discovered networks.""" + try: + scanner = get_wifi_scanner() + networks = scanner.access_points + return jsonify({ + 'networks': [ap.to_summary_dict() for ap in networks], + 'total': len(networks), + }) + except Exception as e: + logger.exception("Error getting networks") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/clients') +def v2_get_clients(): + """Get all discovered clients.""" + try: + scanner = get_wifi_scanner() + clients = scanner.clients + return jsonify({ + 'clients': [c.to_dict() for c in clients], + 'total': len(clients), + }) + except Exception as e: + logger.exception("Error getting clients") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/probes') +def v2_get_probes(): + """Get probe requests.""" + try: + scanner = get_wifi_scanner() + probes = scanner.probe_requests + return jsonify({ + 'probes': [p.to_dict() for p in probes[-100:]], # Last 100 + 'total': len(probes), + }) + except Exception as e: + logger.exception("Error getting probes") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/channels') +def v2_get_channels(): + """Get channel statistics and recommendations.""" + try: + scanner = get_wifi_scanner() + stats = scanner._calculate_channel_stats() + recommendations = scanner._generate_recommendations(stats) + return jsonify({ + 'channel_stats': [s.to_dict() for s in stats], + 'recommendations': [r.to_dict() for r in recommendations], + }) + except Exception as e: + logger.exception("Error getting channel stats") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/stream') +def v2_stream(): + """SSE stream for real-time WiFi events.""" + def generate(): + scanner = get_wifi_scanner() + for event in scanner.get_event_stream(): + yield format_sse(event) + + response = Response(generate(), mimetype='text/event-stream') + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + response.headers['Connection'] = 'keep-alive' + return response + + +@wifi_bp.route('/v2/export') +def v2_export(): + """Export scan data as CSV or JSON.""" + try: + format_type = request.args.get('format', 'json') + data_type = request.args.get('type', 'all') + + scanner = get_wifi_scanner() + + if format_type == 'json': + data = {} + if data_type in ('all', 'networks'): + data['networks'] = [ap.to_summary_dict() for ap in scanner.access_points] + if data_type in ('all', 'clients'): + data['clients'] = [c.to_dict() for c in scanner.clients] + if data_type in ('all', 'probes'): + data['probes'] = [p.to_dict() for p in scanner.probe_requests] + + response = Response( + json.dumps(data, indent=2, default=str), + mimetype='application/json', + ) + response.headers['Content-Disposition'] = 'attachment; filename=wifi_scan.json' + return response + + elif format_type == 'csv': + import csv + import io + + output = io.StringIO() + writer = csv.writer(output) + + # Write networks + writer.writerow(['Networks']) + writer.writerow(['BSSID', 'ESSID', 'Channel', 'Band', 'RSSI', 'Security', 'Vendor', 'Clients', 'First Seen', 'Last Seen']) + for ap in scanner.access_points: + writer.writerow([ + ap.bssid, + ap.essid or '[Hidden]', + ap.channel, + ap.band, + ap.rssi_current, + ap.security, + ap.vendor, + ap.client_count, + ap.first_seen.isoformat() if ap.first_seen else '', + ap.last_seen.isoformat() if ap.last_seen else '', + ]) + + writer.writerow([]) + + # Write clients + writer.writerow(['Clients']) + writer.writerow(['MAC', 'BSSID', 'Vendor', 'RSSI', 'Probed SSIDs', 'First Seen', 'Last Seen']) + for c in scanner.clients: + writer.writerow([ + c.mac, + c.associated_bssid or '', + c.vendor, + c.rssi_current, + ', '.join(c.probed_ssids), + c.first_seen.isoformat() if c.first_seen else '', + c.last_seen.isoformat() if c.last_seen else '', + ]) + + response = Response( + output.getvalue(), + mimetype='text/csv', + ) + response.headers['Content-Disposition'] = 'attachment; filename=wifi_scan.csv' + return response + + else: + return jsonify({'error': f'Unknown format: {format_type}'}), 400 + + except Exception as e: + logger.exception("Error exporting data") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/baseline/set', methods=['POST']) +def v2_set_baseline(): + """Set current networks as baseline.""" + try: + scanner = get_wifi_scanner() + scanner.set_baseline() + return jsonify({'status': 'baseline_set', 'count': len(scanner._baseline_networks)}) + except Exception as e: + logger.exception("Error setting baseline") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/baseline/clear', methods=['POST']) +def v2_clear_baseline(): + """Clear the baseline.""" + try: + scanner = get_wifi_scanner() + scanner.clear_baseline() + return jsonify({'status': 'baseline_cleared'}) + except Exception as e: + logger.exception("Error clearing baseline") + return jsonify({'error': str(e)}), 500 + + +@wifi_bp.route('/v2/clear', methods=['POST']) +def v2_clear_data(): + """Clear all discovered data.""" + try: + scanner = get_wifi_scanner() + scanner.clear_data() + return jsonify({'status': 'cleared'}) + except Exception as e: + logger.exception("Error clearing data") + return jsonify({'error': str(e)}), 500 diff --git a/utils/wifi/models.py b/utils/wifi/models.py index 4af8470..fa2f817 100644 --- a/utils/wifi/models.py +++ b/utils/wifi/models.py @@ -441,6 +441,7 @@ class ChannelRecommendation: score: float # Lower is better reason: str is_dfs: bool = False + recommendation_rank: Optional[int] = None def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" @@ -450,6 +451,7 @@ class ChannelRecommendation: 'score': round(self.score, 3), 'reason': self.reason, 'is_dfs': self.is_dfs, + 'rank': self.recommendation_rank, }