perf: combine WiFi network filters into single list pass

Replace four sequential list comprehensions (band → security → hidden → min_rssi)
with a single pass using a helper function. Reduces algorithmic complexity from O(4n)
to O(n) when multiple filters are applied. All WiFi tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-05-19 14:54:42 +01:00
parent eeaf87c7f2
commit 902a21fc40
+190 -146
View File
@@ -30,14 +30,15 @@ from utils.wifi import (
logger = logging.getLogger(__name__)
wifi_v2_bp = Blueprint('wifi_v2', __name__, url_prefix='/wifi/v2')
wifi_v2_bp = Blueprint("wifi_v2", __name__, url_prefix="/wifi/v2")
# =============================================================================
# Capabilities
# =============================================================================
@wifi_v2_bp.route('/capabilities', methods=['GET'])
@wifi_v2_bp.route("/capabilities", methods=["GET"])
def get_capabilities():
"""
Get WiFi scanning capabilities.
@@ -53,7 +54,8 @@ def get_capabilities():
# Quick Scan
# =============================================================================
@wifi_v2_bp.route('/scan/quick', methods=['POST'])
@wifi_v2_bp.route("/scan/quick", methods=["POST"])
def quick_scan():
"""
Perform a quick one-shot WiFi scan.
@@ -68,8 +70,8 @@ def quick_scan():
WiFiScanResult with discovered networks and channel analysis.
"""
data = request.get_json() or {}
interface = data.get('interface')
timeout = float(data.get('timeout', 15))
interface = data.get("interface")
timeout = float(data.get("timeout", 15))
scanner = get_wifi_scanner()
result = scanner.quick_scan(interface=interface, timeout=timeout)
@@ -81,7 +83,8 @@ def quick_scan():
# Deep Scan (Monitor Mode)
# =============================================================================
@wifi_v2_bp.route('/scan/start', methods=['POST'])
@wifi_v2_bp.route("/scan/start", methods=["POST"])
def start_deep_scan():
"""
Start a deep scan using airodump-ng.
@@ -95,15 +98,15 @@ def start_deep_scan():
channels: Optional list or comma-separated channels to monitor
"""
data = request.get_json() or {}
interface = data.get('interface')
band = data.get('band', 'all')
channel = data.get('channel')
channels = data.get('channels')
interface = data.get("interface")
band = data.get("band", "all")
channel = data.get("channel")
channels = data.get("channels")
channel_list = None
if channels:
if isinstance(channels, str):
channel_list = [c.strip() for c in channels.split(',') if c.strip()]
channel_list = [c.strip() for c in channels.split(",") if c.strip()]
elif isinstance(channels, (list, tuple, set)):
channel_list = list(channels)
else:
@@ -111,13 +114,13 @@ def start_deep_scan():
try:
channel_list = [validate_wifi_channel(c) for c in channel_list]
except (TypeError, ValueError):
return api_error('Invalid channels', 400)
return api_error("Invalid channels", 400)
if channel:
try:
channel = validate_wifi_channel(channel)
except ValueError:
return api_error('Invalid channel', 400)
return api_error("Invalid channel", 400)
scanner = get_wifi_scanner()
success = scanner.start_deep_scan(
@@ -128,27 +131,31 @@ def start_deep_scan():
)
if success:
return jsonify({
'status': 'started',
'mode': SCAN_MODE_DEEP,
'interface': interface or scanner._capabilities.monitor_interface,
})
return jsonify(
{
"status": "started",
"mode": SCAN_MODE_DEEP,
"interface": interface or scanner._capabilities.monitor_interface,
}
)
else:
return api_error(scanner._status.error or 'Scan failed', 400)
return api_error(scanner._status.error or "Scan failed", 400)
@wifi_v2_bp.route('/scan/stop', methods=['POST'])
@wifi_v2_bp.route("/scan/stop", methods=["POST"])
def stop_deep_scan():
"""Stop the deep scan."""
scanner = get_wifi_scanner()
scanner.stop_deep_scan()
return jsonify({
'status': 'stopped',
})
return jsonify(
{
"status": "stopped",
}
)
@wifi_v2_bp.route('/scan/status', methods=['GET'])
@wifi_v2_bp.route("/scan/status", methods=["GET"])
def get_scan_status():
"""Get current scan status."""
scanner = get_wifi_scanner()
@@ -160,7 +167,8 @@ def get_scan_status():
# Data Endpoints
# =============================================================================
@wifi_v2_bp.route('/networks', methods=['GET'])
@wifi_v2_bp.route("/networks", methods=["GET"])
def get_networks():
"""
Get all discovered networks.
@@ -177,54 +185,58 @@ def get_networks():
scanner = get_wifi_scanner()
networks = scanner.access_points
# Apply filters
band = request.args.get('band')
if band:
networks = [n for n in networks if n.band == band]
security = request.args.get('security')
if security:
networks = [n for n in networks if n.security == security]
hidden = request.args.get('hidden')
if hidden == 'true':
networks = [n for n in networks if n.is_hidden]
elif hidden == 'false':
networks = [n for n in networks if not n.is_hidden]
min_rssi = request.args.get('min_rssi')
if min_rssi:
# Apply filters — single pass over the network list
band = request.args.get("band")
security = request.args.get("security")
hidden = request.args.get("hidden")
min_rssi_val: int | None = None
raw_min_rssi = request.args.get("min_rssi")
if raw_min_rssi:
try:
min_rssi = int(min_rssi)
networks = [n for n in networks if n.rssi_current and n.rssi_current >= min_rssi]
min_rssi_val = int(raw_min_rssi)
except ValueError:
pass
if band or security or hidden or min_rssi_val is not None:
def _matches(n: object) -> bool:
if band and n.band != band:
return False
if security and n.security != security:
return False
if hidden == "true" and not n.is_hidden:
return False
if hidden == "false" and n.is_hidden:
return False
return not (min_rssi_val is not None and (not n.rssi_current or n.rssi_current < min_rssi_val))
networks = [n for n in networks if _matches(n)]
# Apply sorting
sort_field = request.args.get('sort', 'rssi')
order = request.args.get('order', 'desc')
reverse = order == 'desc'
sort_field = request.args.get("sort", "rssi")
order = request.args.get("order", "desc")
reverse = order == "desc"
sort_key_map = {
'rssi': lambda n: n.rssi_current or -100,
'channel': lambda n: n.channel or 0,
'essid': lambda n: (n.essid or '').lower(),
'last_seen': lambda n: n.last_seen,
'clients': lambda n: n.client_count,
"rssi": lambda n: n.rssi_current or -100,
"channel": lambda n: n.channel or 0,
"essid": lambda n: (n.essid or "").lower(),
"last_seen": lambda n: n.last_seen,
"clients": lambda n: n.client_count,
}
if sort_field in sort_key_map:
networks.sort(key=sort_key_map[sort_field], reverse=reverse)
# Format output
output_format = request.args.get('format', 'summary')
if output_format == 'full':
output_format = request.args.get("format", "summary")
if output_format == "full":
return jsonify([n.to_dict() for n in networks])
else:
return jsonify([n.to_summary_dict() for n in networks])
@wifi_v2_bp.route('/networks/<bssid>', methods=['GET'])
@wifi_v2_bp.route("/networks/<bssid>", methods=["GET"])
def get_network(bssid):
"""Get a specific network by BSSID."""
scanner = get_wifi_scanner()
@@ -233,10 +245,10 @@ def get_network(bssid):
if network:
return jsonify(network.to_dict())
else:
return api_error('Network not found', 404)
return api_error("Network not found", 404)
@wifi_v2_bp.route('/clients', methods=['GET'])
@wifi_v2_bp.route("/clients", methods=["GET"])
def get_clients():
"""
Get all discovered clients.
@@ -250,17 +262,17 @@ def get_clients():
clients = scanner.clients
# Apply filters
associated = request.args.get('associated')
if associated == 'true':
associated = request.args.get("associated")
if associated == "true":
clients = [c for c in clients if c.is_associated]
elif associated == 'false':
elif associated == "false":
clients = [c for c in clients if not c.is_associated]
bssid = request.args.get('bssid')
bssid = request.args.get("bssid")
if bssid:
clients = [c for c in clients if c.associated_bssid == bssid.upper()]
min_rssi = request.args.get('min_rssi')
min_rssi = request.args.get("min_rssi")
if min_rssi:
try:
min_rssi = int(min_rssi)
@@ -271,7 +283,7 @@ def get_clients():
return jsonify([c.to_dict() for c in clients])
@wifi_v2_bp.route('/clients/<mac>', methods=['GET'])
@wifi_v2_bp.route("/clients/<mac>", methods=["GET"])
def get_client(mac):
"""Get a specific client by MAC address."""
scanner = get_wifi_scanner()
@@ -280,10 +292,10 @@ def get_client(mac):
if client:
return jsonify(client.to_dict())
else:
return api_error('Client not found', 404)
return api_error("Client not found", 404)
@wifi_v2_bp.route('/probes', methods=['GET'])
@wifi_v2_bp.route("/probes", methods=["GET"])
def get_probes():
"""
Get captured probe requests.
@@ -297,16 +309,16 @@ def get_probes():
probes = scanner.probe_requests
# Apply filters
client_mac = request.args.get('client_mac')
client_mac = request.args.get("client_mac")
if client_mac:
probes = [p for p in probes if p.client_mac == client_mac.upper()]
ssid = request.args.get('ssid')
ssid = request.args.get("ssid")
if ssid:
probes = [p for p in probes if p.probed_ssid == ssid]
# Apply limit
limit = request.args.get('limit')
limit = request.args.get("limit")
if limit:
try:
limit = int(limit)
@@ -321,7 +333,8 @@ def get_probes():
# Channel Analysis
# =============================================================================
@wifi_v2_bp.route('/channels', methods=['GET'])
@wifi_v2_bp.route("/channels", methods=["GET"])
def get_channel_stats():
"""
Get channel utilization statistics and recommendations.
@@ -330,24 +343,27 @@ def get_channel_stats():
include_dfs: Include DFS channels in recommendations (true/false)
"""
scanner = get_wifi_scanner()
include_dfs = request.args.get('include_dfs', 'false') == 'true'
include_dfs = request.args.get("include_dfs", "false") == "true"
stats, recommendations = analyze_channels(
scanner.access_points,
include_dfs=include_dfs,
)
return jsonify({
'stats': [s.to_dict() for s in stats],
'recommendations': [r.to_dict() for r in recommendations],
})
return jsonify(
{
"stats": [s.to_dict() for s in stats],
"recommendations": [r.to_dict() for r in recommendations],
}
)
# =============================================================================
# Hidden SSID Correlation
# =============================================================================
@wifi_v2_bp.route('/hidden', methods=['GET'])
@wifi_v2_bp.route("/hidden", methods=["GET"])
def get_hidden_correlations():
"""
Get revealed hidden SSIDs from correlation.
@@ -362,35 +378,41 @@ def get_hidden_correlations():
# Baseline Management
# =============================================================================
@wifi_v2_bp.route('/baseline/set', methods=['POST'])
@wifi_v2_bp.route("/baseline/set", methods=["POST"])
def set_baseline():
"""Mark current networks as baseline (known networks)."""
scanner = get_wifi_scanner()
scanner.set_baseline()
return jsonify({
'status': 'baseline_set',
'network_count': len(scanner._baseline_networks),
'set_at': datetime.now().isoformat(),
})
return jsonify(
{
"status": "baseline_set",
"network_count": len(scanner._baseline_networks),
"set_at": datetime.now().isoformat(),
}
)
@wifi_v2_bp.route('/baseline/clear', methods=['POST'])
@wifi_v2_bp.route("/baseline/clear", methods=["POST"])
def clear_baseline():
"""Clear the baseline."""
scanner = get_wifi_scanner()
scanner.clear_baseline()
return jsonify({
'status': 'baseline_cleared',
})
return jsonify(
{
"status": "baseline_cleared",
}
)
# =============================================================================
# SSE Streaming
# =============================================================================
@wifi_v2_bp.route('/stream', methods=['GET'])
@wifi_v2_bp.route("/stream", methods=["GET"])
def event_stream():
"""
Server-Sent Events stream for real-time updates.
@@ -403,17 +425,18 @@ def event_stream():
- scan_started, scan_stopped, scan_error
- keepalive: Periodic keepalive
"""
def generate() -> Generator[str, None, None]:
scanner = get_wifi_scanner()
for event in scanner.get_event_stream():
with contextlib.suppress(Exception):
process_event('wifi', event, event.get('type'))
process_event("wifi", event, event.get("type"))
yield format_sse(event)
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response = Response(generate(), mimetype="text/event-stream")
response.headers["Cache-Control"] = "no-cache"
response.headers["X-Accel-Buffering"] = "no"
return response
@@ -421,22 +444,26 @@ def event_stream():
# Data Management
# =============================================================================
@wifi_v2_bp.route('/clear', methods=['POST'])
@wifi_v2_bp.route("/clear", methods=["POST"])
def clear_data():
"""Clear all discovered data."""
scanner = get_wifi_scanner()
scanner.clear_data()
return jsonify({
'status': 'cleared',
})
return jsonify(
{
"status": "cleared",
}
)
# =============================================================================
# Export
# =============================================================================
@wifi_v2_bp.route('/export', methods=['GET'])
@wifi_v2_bp.route("/export", methods=["GET"])
def export_data():
"""
Export scan data.
@@ -446,10 +473,10 @@ def export_data():
type: 'networks', 'clients', 'probes', 'all' (default: all)
"""
scanner = get_wifi_scanner()
export_format = request.args.get('format', 'json')
export_type = request.args.get('type', 'all')
export_format = request.args.get("format", "json")
export_type = request.args.get("type", "all")
if export_format == 'csv':
if export_format == "csv":
return _export_csv(scanner, export_type)
else:
return _export_json(scanner, export_type)
@@ -459,24 +486,26 @@ def _export_json(scanner, export_type: str) -> Response:
"""Export data as JSON."""
data = {}
if export_type in ('networks', 'all'):
data['networks'] = [n.to_dict() for n in scanner.access_points]
if export_type in ("networks", "all"):
data["networks"] = [n.to_dict() for n in scanner.access_points]
if export_type in ('clients', 'all'):
data['clients'] = [c.to_dict() for c in scanner.clients]
if export_type in ("clients", "all"):
data["clients"] = [c.to_dict() for c in scanner.clients]
if export_type in ('probes', 'all'):
data['probes'] = [p.to_dict() for p in scanner.probe_requests]
if export_type in ("probes", "all"):
data["probes"] = [p.to_dict() for p in scanner.probe_requests]
data['exported_at'] = datetime.now().isoformat()
data['network_count'] = len(scanner.access_points)
data['client_count'] = len(scanner.clients)
data["exported_at"] = datetime.now().isoformat()
data["network_count"] = len(scanner.access_points)
data["client_count"] = len(scanner.clients)
response = Response(
json.dumps(data, indent=2),
mimetype='application/json',
mimetype="application/json",
)
response.headers["Content-Disposition"] = (
f"attachment; filename=wifi_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
response.headers['Content-Disposition'] = f'attachment; filename=wifi_scan_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
return response
@@ -484,51 +513,66 @@ def _export_csv(scanner, export_type: str) -> Response:
"""Export data as CSV."""
output = io.StringIO()
if export_type in ('networks', 'all'):
if export_type in ("networks", "all"):
writer = csv.writer(output)
writer.writerow([
'BSSID', 'ESSID', 'Channel', 'Band', 'RSSI', 'Security',
'Cipher', 'Auth', 'Vendor', 'Clients', 'First Seen', 'Last Seen'
])
writer.writerow(
[
"BSSID",
"ESSID",
"Channel",
"Band",
"RSSI",
"Security",
"Cipher",
"Auth",
"Vendor",
"Clients",
"First Seen",
"Last Seen",
]
)
for n in scanner.access_points:
writer.writerow([
n.bssid,
n.essid or '[Hidden]',
n.channel,
n.band,
n.rssi_current,
n.security,
n.cipher,
n.auth,
n.vendor or '',
n.client_count,
n.first_seen.isoformat(),
n.last_seen.isoformat(),
])
writer.writerow(
[
n.bssid,
n.essid or "[Hidden]",
n.channel,
n.band,
n.rssi_current,
n.security,
n.cipher,
n.auth,
n.vendor or "",
n.client_count,
n.first_seen.isoformat(),
n.last_seen.isoformat(),
]
)
if export_type == 'all':
if export_type == "all":
writer.writerow([]) # Blank line separator
if export_type in ('clients', 'all'):
if export_type in ("clients", "all"):
writer = csv.writer(output)
if export_type == 'clients':
writer.writerow([
'MAC', 'Vendor', 'RSSI', 'Associated BSSID', 'Probed SSIDs',
'First Seen', 'Last Seen'
])
if export_type == "clients":
writer.writerow(["MAC", "Vendor", "RSSI", "Associated BSSID", "Probed SSIDs", "First Seen", "Last Seen"])
for c in scanner.clients:
writer.writerow([
c.mac,
c.vendor or '',
c.rssi_current,
c.associated_bssid or '',
', '.join(c.probed_ssids),
c.first_seen.isoformat(),
c.last_seen.isoformat(),
])
writer.writerow(
[
c.mac,
c.vendor or "",
c.rssi_current,
c.associated_bssid or "",
", ".join(c.probed_ssids),
c.first_seen.isoformat(),
c.last_seen.isoformat(),
]
)
response = Response(output.getvalue(), mimetype='text/csv')
response.headers['Content-Disposition'] = f'attachment; filename=wifi_scan_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
response = Response(output.getvalue(), mimetype="text/csv")
response.headers["Content-Disposition"] = (
f"attachment; filename=wifi_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)
return response