mirror of
https://github.com/smittix/intercept.git
synced 2026-06-11 23:43:30 -07:00
581 lines
17 KiB
Python
581 lines
17 KiB
Python
"""
|
|
WiFi v2 API routes.
|
|
|
|
New unified WiFi scanning API with Quick Scan and Deep Scan modes,
|
|
channel analysis, hidden SSID correlation, and SSE streaming.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import csv
|
|
import io
|
|
import json
|
|
import logging
|
|
from collections.abc import Generator
|
|
from datetime import datetime
|
|
|
|
from flask import Blueprint, Response, jsonify, request
|
|
|
|
from utils.event_pipeline import process_event
|
|
from utils.responses import api_error
|
|
from utils.sse import format_sse
|
|
from utils.validation import validate_wifi_channel
|
|
from utils.wifi import (
|
|
SCAN_MODE_DEEP,
|
|
analyze_channels,
|
|
get_hidden_correlator,
|
|
get_wifi_scanner,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
wifi_v2_bp = Blueprint("wifi_v2", __name__, url_prefix="/wifi/v2")
|
|
|
|
|
|
# =============================================================================
|
|
# Capabilities
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/capabilities", methods=["GET"])
|
|
def get_capabilities():
|
|
"""
|
|
Get WiFi scanning capabilities.
|
|
|
|
Returns available tools, interfaces, and scan mode support.
|
|
"""
|
|
scanner = get_wifi_scanner()
|
|
caps = scanner.check_capabilities()
|
|
return jsonify(caps.to_dict())
|
|
|
|
|
|
# =============================================================================
|
|
# Quick Scan
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/scan/quick", methods=["POST"])
|
|
def quick_scan():
|
|
"""
|
|
Perform a quick one-shot WiFi scan.
|
|
|
|
Uses system tools (nmcli, iw, iwlist, airport) without monitor mode.
|
|
|
|
Request body:
|
|
interface: Optional interface name
|
|
timeout: Optional scan timeout in seconds (default 15)
|
|
|
|
Returns:
|
|
WiFiScanResult with discovered networks and channel analysis.
|
|
"""
|
|
data = request.get_json() or {}
|
|
interface = data.get("interface")
|
|
timeout = float(data.get("timeout", 15))
|
|
|
|
scanner = get_wifi_scanner()
|
|
result = scanner.quick_scan(interface=interface, timeout=timeout)
|
|
|
|
return jsonify(result.to_dict())
|
|
|
|
|
|
# =============================================================================
|
|
# Deep Scan (Monitor Mode)
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/scan/start", methods=["POST"])
|
|
def start_deep_scan():
|
|
"""
|
|
Start a deep scan using airodump-ng.
|
|
|
|
Requires monitor mode interface and root privileges.
|
|
|
|
Request body:
|
|
interface: Monitor mode interface (e.g., 'wlan0mon')
|
|
band: Band to scan ('2.4', '5', 'all')
|
|
channel: Optional specific channel to monitor
|
|
channels: Optional list or comma-separated channels to monitor
|
|
"""
|
|
data = request.get_json() or {}
|
|
interface = data.get("interface")
|
|
band = data.get("band", "all")
|
|
channel = data.get("channel")
|
|
channels = data.get("channels")
|
|
|
|
channel_list = None
|
|
if channels:
|
|
if isinstance(channels, str):
|
|
channel_list = [c.strip() for c in channels.split(",") if c.strip()]
|
|
elif isinstance(channels, (list, tuple, set)):
|
|
channel_list = list(channels)
|
|
else:
|
|
channel_list = [channels]
|
|
try:
|
|
channel_list = [validate_wifi_channel(c) for c in channel_list]
|
|
except (TypeError, ValueError):
|
|
return api_error("Invalid channels", 400)
|
|
|
|
if channel:
|
|
try:
|
|
channel = validate_wifi_channel(channel)
|
|
except ValueError:
|
|
return api_error("Invalid channel", 400)
|
|
|
|
scanner = get_wifi_scanner()
|
|
success = scanner.start_deep_scan(
|
|
interface=interface,
|
|
band=band,
|
|
channel=channel,
|
|
channels=channel_list,
|
|
)
|
|
|
|
if success:
|
|
return jsonify(
|
|
{
|
|
"status": "started",
|
|
"mode": SCAN_MODE_DEEP,
|
|
"interface": interface or scanner._capabilities.monitor_interface,
|
|
}
|
|
)
|
|
else:
|
|
return api_error(scanner._status.error or "Scan failed", 400)
|
|
|
|
|
|
@wifi_v2_bp.route("/scan/stop", methods=["POST"])
|
|
def stop_deep_scan():
|
|
"""Stop the deep scan."""
|
|
scanner = get_wifi_scanner()
|
|
scanner.stop_deep_scan()
|
|
|
|
return jsonify(
|
|
{
|
|
"status": "stopped",
|
|
}
|
|
)
|
|
|
|
|
|
@wifi_v2_bp.route("/scan/status", methods=["GET"])
|
|
def get_scan_status():
|
|
"""Get current scan status."""
|
|
scanner = get_wifi_scanner()
|
|
status = scanner.get_status()
|
|
return jsonify(status.to_dict())
|
|
|
|
|
|
# =============================================================================
|
|
# Data Endpoints
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/networks", methods=["GET"])
|
|
def get_networks():
|
|
"""
|
|
Get all discovered networks.
|
|
|
|
Query params:
|
|
band: Filter by band ('2.4GHz', '5GHz', '6GHz')
|
|
security: Filter by security type ('Open', 'WEP', 'WPA', 'WPA2', 'WPA3')
|
|
hidden: Filter hidden networks only (true/false)
|
|
min_rssi: Minimum RSSI threshold
|
|
sort: Sort field ('rssi', 'channel', 'essid', 'last_seen')
|
|
order: Sort order ('asc', 'desc')
|
|
format: Response format ('full', 'summary')
|
|
"""
|
|
scanner = get_wifi_scanner()
|
|
networks = scanner.access_points
|
|
|
|
# Apply filters — single pass over the network list
|
|
band = request.args.get("band")
|
|
security = request.args.get("security")
|
|
hidden = request.args.get("hidden")
|
|
min_rssi_val: int | None = None
|
|
raw_min_rssi = request.args.get("min_rssi")
|
|
if raw_min_rssi:
|
|
try:
|
|
min_rssi_val = int(raw_min_rssi)
|
|
except ValueError:
|
|
pass
|
|
|
|
if band or security or hidden or min_rssi_val is not None:
|
|
|
|
def _matches(n: object) -> bool:
|
|
if band and n.band != band:
|
|
return False
|
|
if security and n.security != security:
|
|
return False
|
|
if hidden == "true" and not n.is_hidden:
|
|
return False
|
|
if hidden == "false" and n.is_hidden:
|
|
return False
|
|
if min_rssi_val is not None and (not n.rssi_current or n.rssi_current < min_rssi_val): # noqa: SIM103
|
|
return False
|
|
return True
|
|
|
|
networks = [n for n in networks if _matches(n)]
|
|
|
|
# Apply sorting
|
|
sort_field = request.args.get("sort", "rssi")
|
|
order = request.args.get("order", "desc")
|
|
reverse = order == "desc"
|
|
|
|
sort_key_map = {
|
|
"rssi": lambda n: n.rssi_current or -100,
|
|
"channel": lambda n: n.channel or 0,
|
|
"essid": lambda n: (n.essid or "").lower(),
|
|
"last_seen": lambda n: n.last_seen,
|
|
"clients": lambda n: n.client_count,
|
|
}
|
|
|
|
if sort_field in sort_key_map:
|
|
networks.sort(key=sort_key_map[sort_field], reverse=reverse)
|
|
|
|
# Format output
|
|
output_format = request.args.get("format", "summary")
|
|
if output_format == "full":
|
|
return jsonify([n.to_dict() for n in networks])
|
|
else:
|
|
return jsonify([n.to_summary_dict() for n in networks])
|
|
|
|
|
|
@wifi_v2_bp.route("/networks/<bssid>", methods=["GET"])
|
|
def get_network(bssid):
|
|
"""Get a specific network by BSSID."""
|
|
scanner = get_wifi_scanner()
|
|
network = scanner.get_network(bssid)
|
|
|
|
if network:
|
|
return jsonify(network.to_dict())
|
|
else:
|
|
return api_error("Network not found", 404)
|
|
|
|
|
|
@wifi_v2_bp.route("/clients", methods=["GET"])
|
|
def get_clients():
|
|
"""
|
|
Get all discovered clients.
|
|
|
|
Query params:
|
|
associated: Filter by association status (true/false)
|
|
bssid: Filter by associated BSSID
|
|
min_rssi: Minimum RSSI threshold
|
|
"""
|
|
scanner = get_wifi_scanner()
|
|
clients = scanner.clients
|
|
|
|
# Apply filters
|
|
associated = request.args.get("associated")
|
|
if associated == "true":
|
|
clients = [c for c in clients if c.is_associated]
|
|
elif associated == "false":
|
|
clients = [c for c in clients if not c.is_associated]
|
|
|
|
bssid = request.args.get("bssid")
|
|
if bssid:
|
|
clients = [c for c in clients if c.associated_bssid == bssid.upper()]
|
|
|
|
min_rssi = request.args.get("min_rssi")
|
|
if min_rssi:
|
|
try:
|
|
min_rssi = int(min_rssi)
|
|
clients = [c for c in clients if c.rssi_current and c.rssi_current >= min_rssi]
|
|
except ValueError:
|
|
pass
|
|
|
|
return jsonify([c.to_dict() for c in clients])
|
|
|
|
|
|
@wifi_v2_bp.route("/clients/<mac>", methods=["GET"])
|
|
def get_client(mac):
|
|
"""Get a specific client by MAC address."""
|
|
scanner = get_wifi_scanner()
|
|
client = scanner.get_client(mac)
|
|
|
|
if client:
|
|
return jsonify(client.to_dict())
|
|
else:
|
|
return api_error("Client not found", 404)
|
|
|
|
|
|
@wifi_v2_bp.route("/probes", methods=["GET"])
|
|
def get_probes():
|
|
"""
|
|
Get captured probe requests.
|
|
|
|
Query params:
|
|
client_mac: Filter by client MAC
|
|
ssid: Filter by probed SSID
|
|
limit: Maximum number of results
|
|
"""
|
|
scanner = get_wifi_scanner()
|
|
probes = scanner.probe_requests
|
|
|
|
# Apply filters
|
|
client_mac = request.args.get("client_mac")
|
|
if client_mac:
|
|
probes = [p for p in probes if p.client_mac == client_mac.upper()]
|
|
|
|
ssid = request.args.get("ssid")
|
|
if ssid:
|
|
probes = [p for p in probes if p.probed_ssid == ssid]
|
|
|
|
# Apply limit
|
|
limit = request.args.get("limit")
|
|
if limit:
|
|
try:
|
|
limit = int(limit)
|
|
probes = probes[-limit:] # Most recent
|
|
except ValueError:
|
|
pass
|
|
|
|
return jsonify([p.to_dict() for p in probes])
|
|
|
|
|
|
# =============================================================================
|
|
# Channel Analysis
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/channels", methods=["GET"])
|
|
def get_channel_stats():
|
|
"""
|
|
Get channel utilization statistics and recommendations.
|
|
|
|
Query params:
|
|
include_dfs: Include DFS channels in recommendations (true/false)
|
|
"""
|
|
scanner = get_wifi_scanner()
|
|
include_dfs = request.args.get("include_dfs", "false") == "true"
|
|
|
|
stats, recommendations = analyze_channels(
|
|
scanner.access_points,
|
|
include_dfs=include_dfs,
|
|
)
|
|
|
|
return jsonify(
|
|
{
|
|
"stats": [s.to_dict() for s in stats],
|
|
"recommendations": [r.to_dict() for r in recommendations],
|
|
}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Hidden SSID Correlation
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/hidden", methods=["GET"])
|
|
def get_hidden_correlations():
|
|
"""
|
|
Get revealed hidden SSIDs from correlation.
|
|
|
|
Returns mapping of BSSID -> revealed SSID.
|
|
"""
|
|
correlator = get_hidden_correlator()
|
|
return jsonify(correlator.get_all_revealed())
|
|
|
|
|
|
# =============================================================================
|
|
# Baseline Management
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/baseline/set", methods=["POST"])
|
|
def set_baseline():
|
|
"""Mark current networks as baseline (known networks)."""
|
|
scanner = get_wifi_scanner()
|
|
scanner.set_baseline()
|
|
|
|
return jsonify(
|
|
{
|
|
"status": "baseline_set",
|
|
"network_count": len(scanner._baseline_networks),
|
|
"set_at": datetime.now().isoformat(),
|
|
}
|
|
)
|
|
|
|
|
|
@wifi_v2_bp.route("/baseline/clear", methods=["POST"])
|
|
def clear_baseline():
|
|
"""Clear the baseline."""
|
|
scanner = get_wifi_scanner()
|
|
scanner.clear_baseline()
|
|
|
|
return jsonify(
|
|
{
|
|
"status": "baseline_cleared",
|
|
}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# SSE Streaming
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/stream", methods=["GET"])
|
|
def event_stream():
|
|
"""
|
|
Server-Sent Events stream for real-time updates.
|
|
|
|
Events:
|
|
- network_update: Network discovered/updated
|
|
- client_update: Client discovered/updated
|
|
- probe_request: Probe request detected
|
|
- hidden_revealed: Hidden SSID revealed
|
|
- scan_started, scan_stopped, scan_error
|
|
- keepalive: Periodic keepalive
|
|
"""
|
|
|
|
def generate() -> Generator[str, None, None]:
|
|
scanner = get_wifi_scanner()
|
|
|
|
for event in scanner.get_event_stream():
|
|
with contextlib.suppress(Exception):
|
|
process_event("wifi", event, event.get("type"))
|
|
yield format_sse(event)
|
|
|
|
response = Response(generate(), mimetype="text/event-stream")
|
|
response.headers["Cache-Control"] = "no-cache"
|
|
response.headers["X-Accel-Buffering"] = "no"
|
|
return response
|
|
|
|
|
|
# =============================================================================
|
|
# Data Management
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/clear", methods=["POST"])
|
|
def clear_data():
|
|
"""Clear all discovered data."""
|
|
scanner = get_wifi_scanner()
|
|
scanner.clear_data()
|
|
|
|
return jsonify(
|
|
{
|
|
"status": "cleared",
|
|
}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Export
|
|
# =============================================================================
|
|
|
|
|
|
@wifi_v2_bp.route("/export", methods=["GET"])
|
|
def export_data():
|
|
"""
|
|
Export scan data.
|
|
|
|
Query params:
|
|
format: 'json' or 'csv' (default: json)
|
|
type: 'networks', 'clients', 'probes', 'all' (default: all)
|
|
"""
|
|
scanner = get_wifi_scanner()
|
|
export_format = request.args.get("format", "json")
|
|
export_type = request.args.get("type", "all")
|
|
|
|
if export_format == "csv":
|
|
return _export_csv(scanner, export_type)
|
|
else:
|
|
return _export_json(scanner, export_type)
|
|
|
|
|
|
def _export_json(scanner, export_type: str) -> Response:
|
|
"""Export data as JSON."""
|
|
data = {}
|
|
|
|
if export_type in ("networks", "all"):
|
|
data["networks"] = [n.to_dict() for n in scanner.access_points]
|
|
|
|
if export_type in ("clients", "all"):
|
|
data["clients"] = [c.to_dict() for c in scanner.clients]
|
|
|
|
if export_type in ("probes", "all"):
|
|
data["probes"] = [p.to_dict() for p in scanner.probe_requests]
|
|
|
|
data["exported_at"] = datetime.now().isoformat()
|
|
data["network_count"] = len(scanner.access_points)
|
|
data["client_count"] = len(scanner.clients)
|
|
|
|
response = Response(
|
|
json.dumps(data, indent=2),
|
|
mimetype="application/json",
|
|
)
|
|
response.headers["Content-Disposition"] = (
|
|
f"attachment; filename=wifi_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
)
|
|
return response
|
|
|
|
|
|
def _export_csv(scanner, export_type: str) -> Response:
|
|
"""Export data as CSV."""
|
|
output = io.StringIO()
|
|
|
|
if export_type in ("networks", "all"):
|
|
writer = csv.writer(output)
|
|
writer.writerow(
|
|
[
|
|
"BSSID",
|
|
"ESSID",
|
|
"Channel",
|
|
"Band",
|
|
"RSSI",
|
|
"Security",
|
|
"Cipher",
|
|
"Auth",
|
|
"Vendor",
|
|
"Clients",
|
|
"First Seen",
|
|
"Last Seen",
|
|
]
|
|
)
|
|
|
|
for n in scanner.access_points:
|
|
writer.writerow(
|
|
[
|
|
n.bssid,
|
|
n.essid or "[Hidden]",
|
|
n.channel,
|
|
n.band,
|
|
n.rssi_current,
|
|
n.security,
|
|
n.cipher,
|
|
n.auth,
|
|
n.vendor or "",
|
|
n.client_count,
|
|
n.first_seen.isoformat(),
|
|
n.last_seen.isoformat(),
|
|
]
|
|
)
|
|
|
|
if export_type == "all":
|
|
writer.writerow([]) # Blank line separator
|
|
|
|
if export_type in ("clients", "all"):
|
|
writer = csv.writer(output)
|
|
if export_type == "clients":
|
|
writer.writerow(["MAC", "Vendor", "RSSI", "Associated BSSID", "Probed SSIDs", "First Seen", "Last Seen"])
|
|
|
|
for c in scanner.clients:
|
|
writer.writerow(
|
|
[
|
|
c.mac,
|
|
c.vendor or "",
|
|
c.rssi_current,
|
|
c.associated_bssid or "",
|
|
", ".join(c.probed_ssids),
|
|
c.first_seen.isoformat(),
|
|
c.last_seen.isoformat(),
|
|
]
|
|
)
|
|
|
|
response = Response(output.getvalue(), mimetype="text/csv")
|
|
response.headers["Content-Disposition"] = (
|
|
f"attachment; filename=wifi_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
)
|
|
return response
|