From 902a21fc404890316e1e4e1d391e7922559d8137 Mon Sep 17 00:00:00 2001 From: James Smith Date: Tue, 19 May 2026 14:54:42 +0100 Subject: [PATCH] perf: combine WiFi network filters into single list pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace four sequential list comprehensions (band → security → hidden → min_rssi) with a single pass using a helper function. Reduces algorithmic complexity from O(4n) to O(n) when multiple filters are applied. All WiFi tests pass. Co-Authored-By: Claude Sonnet 4.6 --- routes/wifi_v2.py | 336 ++++++++++++++++++++++++++-------------------- 1 file changed, 190 insertions(+), 146 deletions(-) diff --git a/routes/wifi_v2.py b/routes/wifi_v2.py index dc3193f..8225094 100644 --- a/routes/wifi_v2.py +++ b/routes/wifi_v2.py @@ -30,14 +30,15 @@ from utils.wifi import ( logger = logging.getLogger(__name__) -wifi_v2_bp = Blueprint('wifi_v2', __name__, url_prefix='/wifi/v2') +wifi_v2_bp = Blueprint("wifi_v2", __name__, url_prefix="/wifi/v2") # ============================================================================= # Capabilities # ============================================================================= -@wifi_v2_bp.route('/capabilities', methods=['GET']) + +@wifi_v2_bp.route("/capabilities", methods=["GET"]) def get_capabilities(): """ Get WiFi scanning capabilities. @@ -53,7 +54,8 @@ def get_capabilities(): # Quick Scan # ============================================================================= -@wifi_v2_bp.route('/scan/quick', methods=['POST']) + +@wifi_v2_bp.route("/scan/quick", methods=["POST"]) def quick_scan(): """ Perform a quick one-shot WiFi scan. @@ -68,8 +70,8 @@ def quick_scan(): WiFiScanResult with discovered networks and channel analysis. """ data = request.get_json() or {} - interface = data.get('interface') - timeout = float(data.get('timeout', 15)) + interface = data.get("interface") + timeout = float(data.get("timeout", 15)) scanner = get_wifi_scanner() result = scanner.quick_scan(interface=interface, timeout=timeout) @@ -81,7 +83,8 @@ def quick_scan(): # Deep Scan (Monitor Mode) # ============================================================================= -@wifi_v2_bp.route('/scan/start', methods=['POST']) + +@wifi_v2_bp.route("/scan/start", methods=["POST"]) def start_deep_scan(): """ Start a deep scan using airodump-ng. @@ -95,15 +98,15 @@ def start_deep_scan(): channels: Optional list or comma-separated channels to monitor """ data = request.get_json() or {} - interface = data.get('interface') - band = data.get('band', 'all') - channel = data.get('channel') - channels = data.get('channels') + interface = data.get("interface") + band = data.get("band", "all") + channel = data.get("channel") + channels = data.get("channels") channel_list = None if channels: if isinstance(channels, str): - channel_list = [c.strip() for c in channels.split(',') if c.strip()] + channel_list = [c.strip() for c in channels.split(",") if c.strip()] elif isinstance(channels, (list, tuple, set)): channel_list = list(channels) else: @@ -111,13 +114,13 @@ def start_deep_scan(): try: channel_list = [validate_wifi_channel(c) for c in channel_list] except (TypeError, ValueError): - return api_error('Invalid channels', 400) + return api_error("Invalid channels", 400) if channel: try: channel = validate_wifi_channel(channel) except ValueError: - return api_error('Invalid channel', 400) + return api_error("Invalid channel", 400) scanner = get_wifi_scanner() success = scanner.start_deep_scan( @@ -128,27 +131,31 @@ def start_deep_scan(): ) if success: - return jsonify({ - 'status': 'started', - 'mode': SCAN_MODE_DEEP, - 'interface': interface or scanner._capabilities.monitor_interface, - }) + return jsonify( + { + "status": "started", + "mode": SCAN_MODE_DEEP, + "interface": interface or scanner._capabilities.monitor_interface, + } + ) else: - return api_error(scanner._status.error or 'Scan failed', 400) + return api_error(scanner._status.error or "Scan failed", 400) -@wifi_v2_bp.route('/scan/stop', methods=['POST']) +@wifi_v2_bp.route("/scan/stop", methods=["POST"]) def stop_deep_scan(): """Stop the deep scan.""" scanner = get_wifi_scanner() scanner.stop_deep_scan() - return jsonify({ - 'status': 'stopped', - }) + return jsonify( + { + "status": "stopped", + } + ) -@wifi_v2_bp.route('/scan/status', methods=['GET']) +@wifi_v2_bp.route("/scan/status", methods=["GET"]) def get_scan_status(): """Get current scan status.""" scanner = get_wifi_scanner() @@ -160,7 +167,8 @@ def get_scan_status(): # Data Endpoints # ============================================================================= -@wifi_v2_bp.route('/networks', methods=['GET']) + +@wifi_v2_bp.route("/networks", methods=["GET"]) def get_networks(): """ Get all discovered networks. @@ -177,54 +185,58 @@ def get_networks(): scanner = get_wifi_scanner() networks = scanner.access_points - # Apply filters - band = request.args.get('band') - if band: - networks = [n for n in networks if n.band == band] - - security = request.args.get('security') - if security: - networks = [n for n in networks if n.security == security] - - hidden = request.args.get('hidden') - if hidden == 'true': - networks = [n for n in networks if n.is_hidden] - elif hidden == 'false': - networks = [n for n in networks if not n.is_hidden] - - min_rssi = request.args.get('min_rssi') - if min_rssi: + # Apply filters — single pass over the network list + band = request.args.get("band") + security = request.args.get("security") + hidden = request.args.get("hidden") + min_rssi_val: int | None = None + raw_min_rssi = request.args.get("min_rssi") + if raw_min_rssi: try: - min_rssi = int(min_rssi) - networks = [n for n in networks if n.rssi_current and n.rssi_current >= min_rssi] + min_rssi_val = int(raw_min_rssi) except ValueError: pass + if band or security or hidden or min_rssi_val is not None: + + def _matches(n: object) -> bool: + if band and n.band != band: + return False + if security and n.security != security: + return False + if hidden == "true" and not n.is_hidden: + return False + if hidden == "false" and n.is_hidden: + return False + return not (min_rssi_val is not None and (not n.rssi_current or n.rssi_current < min_rssi_val)) + + networks = [n for n in networks if _matches(n)] + # Apply sorting - sort_field = request.args.get('sort', 'rssi') - order = request.args.get('order', 'desc') - reverse = order == 'desc' + sort_field = request.args.get("sort", "rssi") + order = request.args.get("order", "desc") + reverse = order == "desc" sort_key_map = { - 'rssi': lambda n: n.rssi_current or -100, - 'channel': lambda n: n.channel or 0, - 'essid': lambda n: (n.essid or '').lower(), - 'last_seen': lambda n: n.last_seen, - 'clients': lambda n: n.client_count, + "rssi": lambda n: n.rssi_current or -100, + "channel": lambda n: n.channel or 0, + "essid": lambda n: (n.essid or "").lower(), + "last_seen": lambda n: n.last_seen, + "clients": lambda n: n.client_count, } if sort_field in sort_key_map: networks.sort(key=sort_key_map[sort_field], reverse=reverse) # Format output - output_format = request.args.get('format', 'summary') - if output_format == 'full': + output_format = request.args.get("format", "summary") + if output_format == "full": return jsonify([n.to_dict() for n in networks]) else: return jsonify([n.to_summary_dict() for n in networks]) -@wifi_v2_bp.route('/networks/', methods=['GET']) +@wifi_v2_bp.route("/networks/", methods=["GET"]) def get_network(bssid): """Get a specific network by BSSID.""" scanner = get_wifi_scanner() @@ -233,10 +245,10 @@ def get_network(bssid): if network: return jsonify(network.to_dict()) else: - return api_error('Network not found', 404) + return api_error("Network not found", 404) -@wifi_v2_bp.route('/clients', methods=['GET']) +@wifi_v2_bp.route("/clients", methods=["GET"]) def get_clients(): """ Get all discovered clients. @@ -250,17 +262,17 @@ def get_clients(): clients = scanner.clients # Apply filters - associated = request.args.get('associated') - if associated == 'true': + associated = request.args.get("associated") + if associated == "true": clients = [c for c in clients if c.is_associated] - elif associated == 'false': + elif associated == "false": clients = [c for c in clients if not c.is_associated] - bssid = request.args.get('bssid') + bssid = request.args.get("bssid") if bssid: clients = [c for c in clients if c.associated_bssid == bssid.upper()] - min_rssi = request.args.get('min_rssi') + min_rssi = request.args.get("min_rssi") if min_rssi: try: min_rssi = int(min_rssi) @@ -271,7 +283,7 @@ def get_clients(): return jsonify([c.to_dict() for c in clients]) -@wifi_v2_bp.route('/clients/', methods=['GET']) +@wifi_v2_bp.route("/clients/", methods=["GET"]) def get_client(mac): """Get a specific client by MAC address.""" scanner = get_wifi_scanner() @@ -280,10 +292,10 @@ def get_client(mac): if client: return jsonify(client.to_dict()) else: - return api_error('Client not found', 404) + return api_error("Client not found", 404) -@wifi_v2_bp.route('/probes', methods=['GET']) +@wifi_v2_bp.route("/probes", methods=["GET"]) def get_probes(): """ Get captured probe requests. @@ -297,16 +309,16 @@ def get_probes(): probes = scanner.probe_requests # Apply filters - client_mac = request.args.get('client_mac') + client_mac = request.args.get("client_mac") if client_mac: probes = [p for p in probes if p.client_mac == client_mac.upper()] - ssid = request.args.get('ssid') + ssid = request.args.get("ssid") if ssid: probes = [p for p in probes if p.probed_ssid == ssid] # Apply limit - limit = request.args.get('limit') + limit = request.args.get("limit") if limit: try: limit = int(limit) @@ -321,7 +333,8 @@ def get_probes(): # Channel Analysis # ============================================================================= -@wifi_v2_bp.route('/channels', methods=['GET']) + +@wifi_v2_bp.route("/channels", methods=["GET"]) def get_channel_stats(): """ Get channel utilization statistics and recommendations. @@ -330,24 +343,27 @@ def get_channel_stats(): include_dfs: Include DFS channels in recommendations (true/false) """ scanner = get_wifi_scanner() - include_dfs = request.args.get('include_dfs', 'false') == 'true' + include_dfs = request.args.get("include_dfs", "false") == "true" stats, recommendations = analyze_channels( scanner.access_points, include_dfs=include_dfs, ) - return jsonify({ - 'stats': [s.to_dict() for s in stats], - 'recommendations': [r.to_dict() for r in recommendations], - }) + return jsonify( + { + "stats": [s.to_dict() for s in stats], + "recommendations": [r.to_dict() for r in recommendations], + } + ) # ============================================================================= # Hidden SSID Correlation # ============================================================================= -@wifi_v2_bp.route('/hidden', methods=['GET']) + +@wifi_v2_bp.route("/hidden", methods=["GET"]) def get_hidden_correlations(): """ Get revealed hidden SSIDs from correlation. @@ -362,35 +378,41 @@ def get_hidden_correlations(): # Baseline Management # ============================================================================= -@wifi_v2_bp.route('/baseline/set', methods=['POST']) + +@wifi_v2_bp.route("/baseline/set", methods=["POST"]) def set_baseline(): """Mark current networks as baseline (known networks).""" scanner = get_wifi_scanner() scanner.set_baseline() - return jsonify({ - 'status': 'baseline_set', - 'network_count': len(scanner._baseline_networks), - 'set_at': datetime.now().isoformat(), - }) + return jsonify( + { + "status": "baseline_set", + "network_count": len(scanner._baseline_networks), + "set_at": datetime.now().isoformat(), + } + ) -@wifi_v2_bp.route('/baseline/clear', methods=['POST']) +@wifi_v2_bp.route("/baseline/clear", methods=["POST"]) def clear_baseline(): """Clear the baseline.""" scanner = get_wifi_scanner() scanner.clear_baseline() - return jsonify({ - 'status': 'baseline_cleared', - }) + return jsonify( + { + "status": "baseline_cleared", + } + ) # ============================================================================= # SSE Streaming # ============================================================================= -@wifi_v2_bp.route('/stream', methods=['GET']) + +@wifi_v2_bp.route("/stream", methods=["GET"]) def event_stream(): """ Server-Sent Events stream for real-time updates. @@ -403,17 +425,18 @@ def event_stream(): - scan_started, scan_stopped, scan_error - keepalive: Periodic keepalive """ + def generate() -> Generator[str, None, None]: scanner = get_wifi_scanner() for event in scanner.get_event_stream(): with contextlib.suppress(Exception): - process_event('wifi', event, event.get('type')) + process_event("wifi", event, event.get("type")) yield format_sse(event) - response = Response(generate(), mimetype='text/event-stream') - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' + response = Response(generate(), mimetype="text/event-stream") + response.headers["Cache-Control"] = "no-cache" + response.headers["X-Accel-Buffering"] = "no" return response @@ -421,22 +444,26 @@ def event_stream(): # Data Management # ============================================================================= -@wifi_v2_bp.route('/clear', methods=['POST']) + +@wifi_v2_bp.route("/clear", methods=["POST"]) def clear_data(): """Clear all discovered data.""" scanner = get_wifi_scanner() scanner.clear_data() - return jsonify({ - 'status': 'cleared', - }) + return jsonify( + { + "status": "cleared", + } + ) # ============================================================================= # Export # ============================================================================= -@wifi_v2_bp.route('/export', methods=['GET']) + +@wifi_v2_bp.route("/export", methods=["GET"]) def export_data(): """ Export scan data. @@ -446,10 +473,10 @@ def export_data(): type: 'networks', 'clients', 'probes', 'all' (default: all) """ scanner = get_wifi_scanner() - export_format = request.args.get('format', 'json') - export_type = request.args.get('type', 'all') + export_format = request.args.get("format", "json") + export_type = request.args.get("type", "all") - if export_format == 'csv': + if export_format == "csv": return _export_csv(scanner, export_type) else: return _export_json(scanner, export_type) @@ -459,24 +486,26 @@ def _export_json(scanner, export_type: str) -> Response: """Export data as JSON.""" data = {} - if export_type in ('networks', 'all'): - data['networks'] = [n.to_dict() for n in scanner.access_points] + if export_type in ("networks", "all"): + data["networks"] = [n.to_dict() for n in scanner.access_points] - if export_type in ('clients', 'all'): - data['clients'] = [c.to_dict() for c in scanner.clients] + if export_type in ("clients", "all"): + data["clients"] = [c.to_dict() for c in scanner.clients] - if export_type in ('probes', 'all'): - data['probes'] = [p.to_dict() for p in scanner.probe_requests] + if export_type in ("probes", "all"): + data["probes"] = [p.to_dict() for p in scanner.probe_requests] - data['exported_at'] = datetime.now().isoformat() - data['network_count'] = len(scanner.access_points) - data['client_count'] = len(scanner.clients) + data["exported_at"] = datetime.now().isoformat() + data["network_count"] = len(scanner.access_points) + data["client_count"] = len(scanner.clients) response = Response( json.dumps(data, indent=2), - mimetype='application/json', + mimetype="application/json", + ) + response.headers["Content-Disposition"] = ( + f"attachment; filename=wifi_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) - response.headers['Content-Disposition'] = f'attachment; filename=wifi_scan_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' return response @@ -484,51 +513,66 @@ def _export_csv(scanner, export_type: str) -> Response: """Export data as CSV.""" output = io.StringIO() - if export_type in ('networks', 'all'): + if export_type in ("networks", "all"): writer = csv.writer(output) - writer.writerow([ - 'BSSID', 'ESSID', 'Channel', 'Band', 'RSSI', 'Security', - 'Cipher', 'Auth', 'Vendor', 'Clients', 'First Seen', 'Last Seen' - ]) + writer.writerow( + [ + "BSSID", + "ESSID", + "Channel", + "Band", + "RSSI", + "Security", + "Cipher", + "Auth", + "Vendor", + "Clients", + "First Seen", + "Last Seen", + ] + ) for n in scanner.access_points: - writer.writerow([ - n.bssid, - n.essid or '[Hidden]', - n.channel, - n.band, - n.rssi_current, - n.security, - n.cipher, - n.auth, - n.vendor or '', - n.client_count, - n.first_seen.isoformat(), - n.last_seen.isoformat(), - ]) + writer.writerow( + [ + n.bssid, + n.essid or "[Hidden]", + n.channel, + n.band, + n.rssi_current, + n.security, + n.cipher, + n.auth, + n.vendor or "", + n.client_count, + n.first_seen.isoformat(), + n.last_seen.isoformat(), + ] + ) - if export_type == 'all': + if export_type == "all": writer.writerow([]) # Blank line separator - if export_type in ('clients', 'all'): + if export_type in ("clients", "all"): writer = csv.writer(output) - if export_type == 'clients': - writer.writerow([ - 'MAC', 'Vendor', 'RSSI', 'Associated BSSID', 'Probed SSIDs', - 'First Seen', 'Last Seen' - ]) + if export_type == "clients": + writer.writerow(["MAC", "Vendor", "RSSI", "Associated BSSID", "Probed SSIDs", "First Seen", "Last Seen"]) for c in scanner.clients: - writer.writerow([ - c.mac, - c.vendor or '', - c.rssi_current, - c.associated_bssid or '', - ', '.join(c.probed_ssids), - c.first_seen.isoformat(), - c.last_seen.isoformat(), - ]) + writer.writerow( + [ + c.mac, + c.vendor or "", + c.rssi_current, + c.associated_bssid or "", + ", ".join(c.probed_ssids), + c.first_seen.isoformat(), + c.last_seen.isoformat(), + ] + ) - response = Response(output.getvalue(), mimetype='text/csv') - response.headers['Content-Disposition'] = f'attachment; filename=wifi_scan_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' + response = Response(output.getvalue(), mimetype="text/csv") + response.headers["Content-Disposition"] = ( + f"attachment; filename=wifi_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + ) return response