Files
intercept/utils/wifi/deauth_detector.py
Smittix e00fbfddc1 v2.26.0: fix SSE fanout crash and branded logo FOUC
- Fix SSE fanout thread AttributeError when source queue is None during
  interpreter shutdown by snapshotting to local variable with null guard
- Fix branded "i" logo rendering oversized on first page load (FOUC) by
  adding inline width/height to SVG elements across 10 templates
- Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:51:27 +00:00

615 lines
20 KiB
Python

"""
Deauthentication attack detector using scapy.
Monitors a WiFi interface in monitor mode for deauthentication and disassociation
frames, detecting potential deauth flood attacks.
"""
from __future__ import annotations
import contextlib
import logging
import threading
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable
from utils.constants import (
DEAUTH_ALERT_THRESHOLD,
DEAUTH_CRITICAL_THRESHOLD,
DEAUTH_DETECTION_WINDOW,
DEAUTH_SNIFF_TIMEOUT,
)
logger = logging.getLogger(__name__)
# Deauth reason code descriptions
DEAUTH_REASON_CODES = {
0: "Reserved",
1: "Unspecified reason",
2: "Previous authentication no longer valid",
3: "Station is leaving (or has left) IBSS or ESS",
4: "Disassociated due to inactivity",
5: "Disassociated because AP is unable to handle all currently associated STAs",
6: "Class 2 frame received from nonauthenticated STA",
7: "Class 3 frame received from nonassociated STA",
8: "Disassociated because sending STA is leaving (or has left) BSS",
9: "STA requesting (re)association is not authenticated with responding STA",
10: "Disassociated because the information in the Power Capability element is unacceptable",
11: "Disassociated because the information in the Supported Channels element is unacceptable",
12: "Disassociated due to BSS Transition Management",
13: "Invalid information element",
14: "MIC failure",
15: "4-Way Handshake timeout",
16: "Group Key Handshake timeout",
17: "Information element in 4-Way Handshake different from (Re)Association Request/Probe Response/Beacon frame",
18: "Invalid group cipher",
19: "Invalid pairwise cipher",
20: "Invalid AKMP",
21: "Unsupported RSNE version",
22: "Invalid RSNE capabilities",
23: "IEEE 802.1X authentication failed",
24: "Cipher suite rejected because of security policy",
}
@dataclass
class DeauthPacketInfo:
"""Information about a captured deauth/disassoc packet."""
timestamp: float
frame_type: str # 'deauth' or 'disassoc'
src_mac: str
dst_mac: str
bssid: str
reason_code: int
signal_dbm: int | None = None
@dataclass
class DeauthTracker:
"""Tracks deauth packets for a specific source/dest/bssid combination."""
packets: list[DeauthPacketInfo] = field(default_factory=list)
first_seen: float = 0.0
last_seen: float = 0.0
alert_sent: bool = False
def add_packet(self, pkt: DeauthPacketInfo):
self.packets.append(pkt)
now = pkt.timestamp
if self.first_seen == 0.0:
self.first_seen = now
self.last_seen = now
def get_packets_in_window(self, window_seconds: float) -> list[DeauthPacketInfo]:
"""Get packets within the time window."""
cutoff = time.time() - window_seconds
return [p for p in self.packets if p.timestamp >= cutoff]
def cleanup_old_packets(self, window_seconds: float):
"""Remove packets older than the window."""
cutoff = time.time() - window_seconds
self.packets = [p for p in self.packets if p.timestamp >= cutoff]
if self.packets:
self.first_seen = self.packets[0].timestamp
else:
self.first_seen = 0.0
self.alert_sent = False
@dataclass
class DeauthAlert:
"""A deauthentication attack alert."""
id: str
timestamp: float
severity: str # 'low', 'medium', 'high'
# Attacker info
attacker_mac: str
attacker_vendor: str | None
attacker_signal_dbm: int | None
is_spoofed_ap: bool
# Target info
target_mac: str
target_vendor: str | None
target_type: str # 'client', 'broadcast', 'ap'
target_known_from_scan: bool
# Access point info
ap_bssid: str
ap_essid: str | None
ap_channel: int | None
# Attack info
frame_type: str
reason_code: int
reason_text: str
packet_count: int
window_seconds: float
packets_per_second: float
# Analysis
attack_type: str # 'targeted', 'broadcast', 'ap_flood'
description: str
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'id': self.id,
'type': 'deauth_alert',
'timestamp': self.timestamp,
'severity': self.severity,
'attacker': {
'mac': self.attacker_mac,
'vendor': self.attacker_vendor,
'signal_dbm': self.attacker_signal_dbm,
'is_spoofed_ap': self.is_spoofed_ap,
},
'target': {
'mac': self.target_mac,
'vendor': self.target_vendor,
'type': self.target_type,
'known_from_scan': self.target_known_from_scan,
},
'access_point': {
'bssid': self.ap_bssid,
'essid': self.ap_essid,
'channel': self.ap_channel,
},
'attack_info': {
'frame_type': self.frame_type,
'reason_code': self.reason_code,
'reason_text': self.reason_text,
'packet_count': self.packet_count,
'window_seconds': self.window_seconds,
'packets_per_second': self.packets_per_second,
},
'analysis': {
'attack_type': self.attack_type,
'description': self.description,
},
}
class DeauthDetector:
"""
Detects deauthentication attacks using scapy.
Monitors a WiFi interface in monitor mode for deauth/disassoc frames
and emits alerts when attack thresholds are exceeded.
"""
def __init__(
self,
interface: str,
event_callback: Callable[[dict], None],
get_networks: Callable[[], dict[str, Any]] | None = None,
get_clients: Callable[[], dict[str, Any]] | None = None,
):
"""
Initialize the deauth detector.
Args:
interface: Monitor mode interface to sniff on
event_callback: Callback function to receive alert events
get_networks: Optional function to get current WiFi networks (bssid -> network_info)
get_clients: Optional function to get current WiFi clients (mac -> client_info)
"""
self.interface = interface
self.event_callback = event_callback
self.get_networks = get_networks
self.get_clients = get_clients
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._lock = threading.Lock()
# Track deauth packets by (src, dst, bssid) tuple
self._trackers: dict[tuple[str, str, str], DeauthTracker] = defaultdict(DeauthTracker)
# Alert history
self._alerts: list[DeauthAlert] = []
self._alert_counter = 0
# Stats
self._packets_captured = 0
self._alerts_generated = 0
self._started_at: float | None = None
@property
def is_running(self) -> bool:
"""Check if detector is running."""
return self._thread is not None and self._thread.is_alive()
@property
def stats(self) -> dict:
"""Get detector statistics."""
return {
'is_running': self.is_running,
'interface': self.interface,
'started_at': self._started_at,
'packets_captured': self._packets_captured,
'alerts_generated': self._alerts_generated,
'active_trackers': len(self._trackers),
}
def start(self) -> bool:
"""
Start detection in background thread.
Returns:
True if started successfully.
"""
if self.is_running:
logger.warning("Deauth detector already running")
return True
self._stop_event.clear()
self._started_at = time.time()
self._thread = threading.Thread(
target=self._sniff_loop,
name="DeauthDetector",
daemon=True,
)
self._thread.start()
logger.info(f"Deauth detector started on {self.interface}")
return True
def stop(self) -> bool:
"""
Stop detection.
Returns:
True if stopped successfully.
"""
if not self.is_running:
return True
logger.info("Stopping deauth detector...")
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
if self._thread.is_alive():
logger.warning("Deauth detector thread did not stop cleanly")
self._thread = None
self._started_at = None
logger.info("Deauth detector stopped")
return True
def get_alerts(self, limit: int = 100) -> list[dict]:
"""Get recent alerts."""
with self._lock:
return [a.to_dict() for a in self._alerts[-limit:]]
def clear_alerts(self):
"""Clear alert history."""
with self._lock:
self._alerts.clear()
self._trackers.clear()
self._alert_counter = 0
def _sniff_loop(self):
"""Main sniffing loop using scapy."""
try:
from scapy.all import Dot11, Dot11Deauth, Dot11Disas, sniff
except ImportError:
logger.error("scapy not installed. Install with: pip install scapy")
self.event_callback({
'type': 'deauth_error',
'error': 'scapy not installed',
})
return
logger.info(f"Starting deauth sniff on {self.interface}")
def packet_handler(pkt):
"""Handle each captured packet."""
if self._stop_event.is_set():
return
# Check for deauth or disassoc frames
if pkt.haslayer(Dot11Deauth) or pkt.haslayer(Dot11Disas):
self._process_deauth_packet(pkt)
try:
# Use stop_filter to allow clean shutdown
sniff(
iface=self.interface,
prn=packet_handler,
store=False,
stop_filter=lambda _: self._stop_event.is_set(),
timeout=DEAUTH_SNIFF_TIMEOUT,
)
# Continue sniffing until stop is requested
while not self._stop_event.is_set():
sniff(
iface=self.interface,
prn=packet_handler,
store=False,
stop_filter=lambda _: self._stop_event.is_set(),
timeout=DEAUTH_SNIFF_TIMEOUT,
)
# Periodic cleanup
self._cleanup_old_trackers()
except OSError as e:
if "No such device" in str(e):
logger.error(f"Interface {self.interface} not found")
self.event_callback({
'type': 'deauth_error',
'error': f'Interface {self.interface} not found',
})
else:
logger.exception(f"Sniff error: {e}")
self.event_callback({
'type': 'deauth_error',
'error': str(e),
})
except Exception as e:
logger.exception(f"Sniff error: {e}")
self.event_callback({
'type': 'deauth_error',
'error': str(e),
})
def _process_deauth_packet(self, pkt):
"""Process a deauth/disassoc packet and emit alert if threshold exceeded."""
try:
from scapy.all import Dot11, Dot11Deauth, Dot11Disas, RadioTap
except ImportError:
return
# Determine frame type
if pkt.haslayer(Dot11Deauth):
frame_type = 'deauth'
reason_code = pkt[Dot11Deauth].reason
elif pkt.haslayer(Dot11Disas):
frame_type = 'disassoc'
reason_code = pkt[Dot11Disas].reason
else:
return
# Extract addresses from Dot11 layer
dot11 = pkt[Dot11]
dst_mac = (dot11.addr1 or '').upper()
src_mac = (dot11.addr2 or '').upper()
bssid = (dot11.addr3 or '').upper()
# Skip if addresses are missing
if not src_mac or not dst_mac:
return
# Extract signal strength from RadioTap if available
signal_dbm = None
if pkt.haslayer(RadioTap):
with contextlib.suppress(AttributeError):
signal_dbm = pkt[RadioTap].dBm_AntSignal
# Create packet info
pkt_info = DeauthPacketInfo(
timestamp=time.time(),
frame_type=frame_type,
src_mac=src_mac,
dst_mac=dst_mac,
bssid=bssid,
reason_code=reason_code,
signal_dbm=signal_dbm,
)
self._packets_captured += 1
# Track packet
tracker_key = (src_mac, dst_mac, bssid)
with self._lock:
tracker = self._trackers[tracker_key]
tracker.add_packet(pkt_info)
# Check if threshold exceeded
packets_in_window = tracker.get_packets_in_window(DEAUTH_DETECTION_WINDOW)
packet_count = len(packets_in_window)
if packet_count >= DEAUTH_ALERT_THRESHOLD and not tracker.alert_sent:
# Generate alert
alert = self._generate_alert(
tracker_key=tracker_key,
packets=packets_in_window,
packet_count=packet_count,
)
self._alerts.append(alert)
self._alerts_generated += 1
tracker.alert_sent = True
# Emit event
self.event_callback(alert.to_dict())
logger.warning(
f"Deauth attack detected: {src_mac} -> {dst_mac} "
f"({packet_count} packets in {DEAUTH_DETECTION_WINDOW}s)"
)
def _generate_alert(
self,
tracker_key: tuple[str, str, str],
packets: list[DeauthPacketInfo],
packet_count: int,
) -> DeauthAlert:
"""Generate an alert from tracked packets."""
src_mac, dst_mac, bssid = tracker_key
# Get latest packet for details
latest_pkt = packets[-1] if packets else None
# Determine severity
if packet_count >= DEAUTH_CRITICAL_THRESHOLD:
severity = 'high'
elif packet_count >= DEAUTH_ALERT_THRESHOLD * 2.5:
severity = 'medium'
else:
severity = 'low'
# Lookup AP info
ap_info = self._lookup_ap(bssid)
# Lookup target info
target_info = self._lookup_device(dst_mac)
# Determine target type
if dst_mac == 'FF:FF:FF:FF:FF:FF':
target_type = 'broadcast'
elif dst_mac in self._get_known_aps():
target_type = 'ap'
else:
target_type = 'client'
# Check if source is spoofed (matches known AP)
is_spoofed = self._check_spoofed_source(src_mac)
# Get attacker vendor
attacker_vendor = self._get_vendor(src_mac)
# Calculate packets per second
if packets:
time_span = packets[-1].timestamp - packets[0].timestamp
pps = packet_count / time_span if time_span > 0 else float(packet_count)
else:
pps = 0.0
# Determine attack type and description
if dst_mac == 'FF:FF:FF:FF:FF:FF':
attack_type = 'broadcast'
description = "Broadcast deauth flood targeting all clients on the network"
elif target_type == 'ap':
attack_type = 'ap_flood'
description = "Deauth flood targeting access point"
else:
attack_type = 'targeted'
description = f"Targeted deauth flood against {'known' if target_info.get('known_from_scan') else 'unknown'} client"
# Get reason code info
reason_code = latest_pkt.reason_code if latest_pkt else 0
reason_text = DEAUTH_REASON_CODES.get(reason_code, f"Unknown ({reason_code})")
# Get signal
signal_dbm = None
for pkt in reversed(packets):
if pkt.signal_dbm is not None:
signal_dbm = pkt.signal_dbm
break
# Generate unique ID
self._alert_counter += 1
alert_id = f"deauth-{int(time.time())}-{self._alert_counter}"
return DeauthAlert(
id=alert_id,
timestamp=time.time(),
severity=severity,
attacker_mac=src_mac,
attacker_vendor=attacker_vendor,
attacker_signal_dbm=signal_dbm,
is_spoofed_ap=is_spoofed,
target_mac=dst_mac,
target_vendor=target_info.get('vendor'),
target_type=target_type,
target_known_from_scan=target_info.get('known_from_scan', False),
ap_bssid=bssid,
ap_essid=ap_info.get('essid'),
ap_channel=ap_info.get('channel'),
frame_type=latest_pkt.frame_type if latest_pkt else 'deauth',
reason_code=reason_code,
reason_text=reason_text,
packet_count=packet_count,
window_seconds=DEAUTH_DETECTION_WINDOW,
packets_per_second=round(pps, 1),
attack_type=attack_type,
description=description,
)
def _lookup_ap(self, bssid: str) -> dict:
"""Get AP info from current scan data."""
if not self.get_networks:
return {'bssid': bssid, 'essid': None, 'channel': None}
try:
networks = self.get_networks()
ap = networks.get(bssid.upper())
if ap:
return {
'bssid': bssid,
'essid': ap.get('essid') or ap.get('ssid'),
'channel': ap.get('channel'),
}
except Exception as e:
logger.debug(f"Error looking up AP {bssid}: {e}")
return {'bssid': bssid, 'essid': None, 'channel': None}
def _lookup_device(self, mac: str) -> dict:
"""Get device info and vendor from MAC."""
vendor = self._get_vendor(mac)
known_from_scan = False
if self.get_clients:
try:
clients = self.get_clients()
if mac.upper() in clients:
known_from_scan = True
except Exception:
pass
return {
'mac': mac,
'vendor': vendor,
'known_from_scan': known_from_scan,
}
def _get_known_aps(self) -> set[str]:
"""Get set of known AP BSSIDs."""
if not self.get_networks:
return set()
try:
networks = self.get_networks()
return {bssid.upper() for bssid in networks}
except Exception:
return set()
def _check_spoofed_source(self, src_mac: str) -> bool:
"""Check if source MAC matches a known AP (spoofing indicator)."""
return src_mac.upper() in self._get_known_aps()
def _get_vendor(self, mac: str) -> str | None:
"""Get vendor from MAC OUI."""
try:
from data.oui import get_manufacturer
vendor = get_manufacturer(mac)
return vendor if vendor != 'Unknown' else None
except Exception:
pass
# Fallback to wifi constants
try:
from utils.wifi.constants import get_vendor_from_mac
return get_vendor_from_mac(mac)
except Exception:
return None
def _cleanup_old_trackers(self):
"""Remove old packets and empty trackers."""
with self._lock:
keys_to_remove = []
for key, tracker in self._trackers.items():
tracker.cleanup_old_packets(DEAUTH_DETECTION_WINDOW * 2)
if not tracker.packets:
keys_to_remove.append(key)
for key in keys_to_remove:
del self._trackers[key]