diff --git a/app.py b/app.py
index 3b6e1ee..42ec4ba 100644
--- a/app.py
+++ b/app.py
@@ -38,6 +38,7 @@ from utils.constants import (
MAX_BT_DEVICE_AGE_SECONDS,
MAX_VESSEL_AGE_SECONDS,
MAX_DSC_MESSAGE_AGE_SECONDS,
+ MAX_DEAUTH_ALERTS_AGE_SECONDS,
QUEUE_MAX_SIZE,
)
import logging
@@ -175,6 +176,11 @@ dsc_lock = threading.Lock()
tscm_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
tscm_lock = threading.Lock()
+# Deauth Attack Detection
+deauth_detector = None
+deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
+deauth_detector_lock = threading.Lock()
+
# ============================================
# GLOBAL STATE DICTIONARIES
# ============================================
@@ -204,6 +210,9 @@ ais_vessels = DataStore(max_age_seconds=MAX_VESSEL_AGE_SECONDS, name='ais_vessel
# DSC (Digital Selective Calling) state - using DataStore for automatic cleanup
dsc_messages = DataStore(max_age_seconds=MAX_DSC_MESSAGE_AGE_SECONDS, name='dsc_messages')
+# Deauth alerts - using DataStore for automatic cleanup
+deauth_alerts = DataStore(max_age_seconds=MAX_DEAUTH_ALERTS_AGE_SECONDS, name='deauth_alerts')
+
# Satellite state
satellite_passes = [] # Predicted satellite passes (not auto-cleaned, calculated)
@@ -215,6 +224,7 @@ cleanup_manager.register(bt_beacons)
cleanup_manager.register(adsb_aircraft)
cleanup_manager.register(ais_vessels)
cleanup_manager.register(dsc_messages)
+cleanup_manager.register(deauth_alerts)
# ============================================
# SDR DEVICE REGISTRY
diff --git a/config.py b/config.py
index 2250f1d..ba7e0c9 100644
--- a/config.py
+++ b/config.py
@@ -7,10 +7,17 @@ import os
import sys
# Application version
-VERSION = "2.12.0"
+VERSION = "2.12.1"
# Changelog - latest release notes (shown on welcome screen)
CHANGELOG = [
+ {
+ "version": "2.12.1",
+ "date": "February 2026",
+ "highlights": [
+ "Bug fixes and improvements",
+ ]
+ },
{
"version": "2.12.0",
"date": "January 2026",
@@ -136,27 +143,27 @@ AIRODUMP_HEADER_LINES = _get_env_int('AIRODUMP_HEADER_LINES', 2)
BT_SCAN_TIMEOUT = _get_env_int('BT_SCAN_TIMEOUT', 10)
BT_UPDATE_INTERVAL = _get_env_float('BT_UPDATE_INTERVAL', 2.0)
-# ADS-B settings
-ADSB_SBS_PORT = _get_env_int('ADSB_SBS_PORT', 30003)
-ADSB_UPDATE_INTERVAL = _get_env_float('ADSB_UPDATE_INTERVAL', 1.0)
-ADSB_AUTO_START = _get_env_bool('ADSB_AUTO_START', False)
-ADSB_HISTORY_ENABLED = _get_env_bool('ADSB_HISTORY_ENABLED', False)
+# ADS-B settings
+ADSB_SBS_PORT = _get_env_int('ADSB_SBS_PORT', 30003)
+ADSB_UPDATE_INTERVAL = _get_env_float('ADSB_UPDATE_INTERVAL', 1.0)
+ADSB_AUTO_START = _get_env_bool('ADSB_AUTO_START', False)
+ADSB_HISTORY_ENABLED = _get_env_bool('ADSB_HISTORY_ENABLED', False)
ADSB_DB_HOST = _get_env('ADSB_DB_HOST', 'localhost')
ADSB_DB_PORT = _get_env_int('ADSB_DB_PORT', 5432)
ADSB_DB_NAME = _get_env('ADSB_DB_NAME', 'intercept_adsb')
ADSB_DB_USER = _get_env('ADSB_DB_USER', 'intercept')
ADSB_DB_PASSWORD = _get_env('ADSB_DB_PASSWORD', 'intercept')
-ADSB_HISTORY_BATCH_SIZE = _get_env_int('ADSB_HISTORY_BATCH_SIZE', 500)
-ADSB_HISTORY_FLUSH_INTERVAL = _get_env_float('ADSB_HISTORY_FLUSH_INTERVAL', 1.0)
-ADSB_HISTORY_QUEUE_SIZE = _get_env_int('ADSB_HISTORY_QUEUE_SIZE', 50000)
-
-# Observer location settings
-SHARED_OBSERVER_LOCATION_ENABLED = _get_env_bool('SHARED_OBSERVER_LOCATION', True)
-
-# Satellite settings
-SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
-SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
-SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
+ADSB_HISTORY_BATCH_SIZE = _get_env_int('ADSB_HISTORY_BATCH_SIZE', 500)
+ADSB_HISTORY_FLUSH_INTERVAL = _get_env_float('ADSB_HISTORY_FLUSH_INTERVAL', 1.0)
+ADSB_HISTORY_QUEUE_SIZE = _get_env_int('ADSB_HISTORY_QUEUE_SIZE', 50000)
+
+# Observer location settings
+SHARED_OBSERVER_LOCATION_ENABLED = _get_env_bool('SHARED_OBSERVER_LOCATION', True)
+
+# Satellite settings
+SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
+SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
+SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
# Update checking
GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')
diff --git a/pyproject.toml b/pyproject.toml
index 7dc672d..6e139ee 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "intercept"
-version = "2.12.0"
+version = "2.12.1"
description = "Signal Intelligence Platform - Pager/433MHz/ADS-B/Satellite/WiFi/Bluetooth"
readme = "README.md"
requires-python = ">=3.9"
diff --git a/requirements.txt b/requirements.txt
index a679ccf..a6d0adf 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,6 +23,9 @@ pyserial>=3.5
# Meshtastic mesh network support (optional - only needed for Meshtastic features)
meshtastic>=2.0.0
+# Deauthentication attack detection (optional - for WiFi TSCM)
+scapy>=2.4.5
+
# QR code generation for Meshtastic channels (optional)
qrcode[pil]>=7.4
diff --git a/routes/wifi.py b/routes/wifi.py
index 5b833cc..218b978 100644
--- a/routes/wifi.py
+++ b/routes/wifi.py
@@ -1413,3 +1413,143 @@ def v2_clear_data():
except Exception as e:
logger.exception("Error clearing data")
return jsonify({'error': str(e)}), 500
+
+
+# =============================================================================
+# V2 Deauth Detection Endpoints
+# =============================================================================
+
+@wifi_bp.route('/v2/deauth/status')
+def v2_deauth_status():
+ """
+ Get deauth detection status and recent alerts.
+
+ Returns:
+ - is_running: Whether deauth detector is active
+ - interface: Monitor interface being used
+ - stats: Detection statistics
+ - recent_alerts: Recent deauth alerts
+ """
+ try:
+ scanner = get_wifi_scanner()
+ detector = scanner.deauth_detector
+
+ if detector:
+ stats = detector.stats
+ alerts = detector.get_alerts(limit=50)
+ else:
+ stats = {
+ 'is_running': False,
+ 'interface': None,
+ 'packets_captured': 0,
+ 'alerts_generated': 0,
+ }
+ alerts = []
+
+ return jsonify({
+ 'is_running': stats.get('is_running', False),
+ 'interface': stats.get('interface'),
+ 'started_at': stats.get('started_at'),
+ 'stats': {
+ 'packets_captured': stats.get('packets_captured', 0),
+ 'alerts_generated': stats.get('alerts_generated', 0),
+ 'active_trackers': stats.get('active_trackers', 0),
+ },
+ 'recent_alerts': alerts,
+ })
+ except Exception as e:
+ logger.exception("Error getting deauth status")
+ return jsonify({'error': str(e)}), 500
+
+
+@wifi_bp.route('/v2/deauth/stream')
+def v2_deauth_stream():
+ """
+ SSE stream for real-time deauth alerts.
+
+ Events:
+ - deauth_alert: A deauth attack was detected
+ - deauth_detector_started: Detector started
+ - deauth_detector_stopped: Detector stopped
+ - deauth_error: An error occurred
+ - keepalive: Periodic keepalive
+ """
+ def generate():
+ last_keepalive = time.time()
+ keepalive_interval = SSE_KEEPALIVE_INTERVAL
+
+ while True:
+ try:
+ # Try to get from the dedicated deauth queue
+ msg = app_module.deauth_detector_queue.get(timeout=SSE_QUEUE_TIMEOUT)
+ last_keepalive = time.time()
+ yield format_sse(msg)
+ except queue.Empty:
+ now = time.time()
+ if now - last_keepalive >= keepalive_interval:
+ yield format_sse({'type': 'keepalive'})
+ last_keepalive = now
+
+ response = Response(generate(), mimetype='text/event-stream')
+ response.headers['Cache-Control'] = 'no-cache'
+ response.headers['X-Accel-Buffering'] = 'no'
+ response.headers['Connection'] = 'keep-alive'
+ return response
+
+
+@wifi_bp.route('/v2/deauth/alerts')
+def v2_deauth_alerts():
+ """
+ Get historical deauth alerts.
+
+ Query params:
+ - limit: Maximum number of alerts to return (default 100)
+ """
+ try:
+ limit = request.args.get('limit', 100, type=int)
+ limit = max(1, min(limit, 1000)) # Clamp between 1 and 1000
+
+ scanner = get_wifi_scanner()
+ alerts = scanner.get_deauth_alerts(limit=limit)
+
+ # Also include alerts from DataStore that might have been persisted
+ try:
+ stored_alerts = list(app_module.deauth_alerts.values())
+ # Merge and deduplicate by ID
+ alert_ids = {a.get('id') for a in alerts}
+ for alert in stored_alerts:
+ if alert.get('id') not in alert_ids:
+ alerts.append(alert)
+ # Sort by timestamp descending
+ alerts.sort(key=lambda a: a.get('timestamp', 0), reverse=True)
+ alerts = alerts[:limit]
+ except Exception:
+ pass
+
+ return jsonify({
+ 'alerts': alerts,
+ 'count': len(alerts),
+ })
+ except Exception as e:
+ logger.exception("Error getting deauth alerts")
+ return jsonify({'error': str(e)}), 500
+
+
+@wifi_bp.route('/v2/deauth/clear', methods=['POST'])
+def v2_deauth_clear():
+ """Clear deauth alert history."""
+ try:
+ scanner = get_wifi_scanner()
+ scanner.clear_deauth_alerts()
+
+ # Clear the queue
+ while not app_module.deauth_detector_queue.empty():
+ try:
+ app_module.deauth_detector_queue.get_nowait()
+ except queue.Empty:
+ break
+
+ return jsonify({'status': 'cleared'})
+ except Exception as e:
+ logger.exception("Error clearing deauth alerts")
+ return jsonify({'error': str(e)}), 500
diff --git a/static/css/index.css b/static/css/index.css
index 6bc9d3c..62e13e9 100644
--- a/static/css/index.css
+++ b/static/css/index.css
@@ -978,6 +978,18 @@ header h1 {
color: var(--accent-cyan);
}
+/* Donate button - warm amber accent */
+.nav-tool-btn--donate {
+ text-decoration: none;
+ color: var(--accent-amber);
+}
+
+.nav-tool-btn--donate:hover {
+ color: var(--accent-orange);
+ border-color: var(--accent-amber);
+ background: rgba(212, 168, 83, 0.1);
+}
+
/* Theme toggle icon states in nav bar */
.nav-tool-btn .icon-sun,
.nav-tool-btn .icon-moon {
diff --git a/static/css/settings.css b/static/css/settings.css
index 6a948ad..c98209e 100644
--- a/static/css/settings.css
+++ b/static/css/settings.css
@@ -351,6 +351,34 @@
color: var(--accent-cyan, #00d4ff);
}
+/* Donate Button */
+.donate-btn {
+ display: inline-flex;
+ align-items: center;
+ justify-content: center;
+ padding: 10px 20px;
+ background: linear-gradient(135deg, var(--accent-amber, #d4a853) 0%, var(--accent-orange, #f59e0b) 100%);
+ border: none;
+ border-radius: 6px;
+ color: #000;
+ font-size: 13px;
+ font-weight: 600;
+ text-decoration: none;
+ cursor: pointer;
+ transition: all 0.2s ease;
+ box-shadow: 0 2px 8px rgba(212, 168, 83, 0.3);
+}
+
+.donate-btn:hover {
+ transform: translateY(-1px);
+ box-shadow: 0 4px 12px rgba(212, 168, 83, 0.4);
+ filter: brightness(1.1);
+}
+
+.donate-btn:active {
+ transform: translateY(0);
+}
+
/* Tile Provider Custom URL */
.custom-url-row {
margin-top: 8px;
diff --git a/templates/index.html b/templates/index.html
index d5bb67f..545b745 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -423,6 +423,7 @@
+
+
+
diff --git a/tests/test_deauth_detector.py b/tests/test_deauth_detector.py
new file mode 100644
index 0000000..1b3d1f8
--- /dev/null
+++ b/tests/test_deauth_detector.py
@@ -0,0 +1,587 @@
+"""
+Unit tests for deauthentication attack detector.
+"""
+
+import os
+import sys
+import time
+from unittest.mock import MagicMock, patch
+
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+
+from utils.wifi.deauth_detector import (
+ DeauthDetector,
+ DeauthPacketInfo,
+ DeauthTracker,
+ DeauthAlert,
+ DEAUTH_REASON_CODES,
+)
+from utils.constants import (
+ DEAUTH_DETECTION_WINDOW,
+ DEAUTH_ALERT_THRESHOLD,
+ DEAUTH_CRITICAL_THRESHOLD,
+)
+
+
+class TestDeauthPacketInfo:
+ """Tests for DeauthPacketInfo dataclass."""
+
+ def test_creation(self):
+ """Test basic creation of packet info."""
+ pkt = DeauthPacketInfo(
+ timestamp=1234567890.0,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ signal_dbm=-45,
+ )
+
+ assert pkt.frame_type == 'deauth'
+ assert pkt.src_mac == 'AA:BB:CC:DD:EE:FF'
+ assert pkt.reason_code == 7
+ assert pkt.signal_dbm == -45
+
+
+class TestDeauthTracker:
+ """Tests for DeauthTracker."""
+
+ def test_add_packet(self):
+ """Test adding packets to tracker."""
+ tracker = DeauthTracker()
+
+ pkt1 = DeauthPacketInfo(
+ timestamp=100.0,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ )
+ tracker.add_packet(pkt1)
+
+ assert len(tracker.packets) == 1
+ assert tracker.first_seen == 100.0
+ assert tracker.last_seen == 100.0
+
+ def test_multiple_packets(self):
+ """Test adding multiple packets."""
+ tracker = DeauthTracker()
+
+ for i in range(5):
+ pkt = DeauthPacketInfo(
+ timestamp=100.0 + i,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ )
+ tracker.add_packet(pkt)
+
+ assert len(tracker.packets) == 5
+ assert tracker.first_seen == 100.0
+ assert tracker.last_seen == 104.0
+
+ def test_get_packets_in_window(self):
+ """Test filtering packets by time window."""
+ tracker = DeauthTracker()
+ now = time.time()
+
+ # Add old packet
+ tracker.add_packet(DeauthPacketInfo(
+ timestamp=now - 10,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ ))
+
+ # Add recent packets
+ for i in range(3):
+ tracker.add_packet(DeauthPacketInfo(
+ timestamp=now - i,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ ))
+
+ # 5-second window should only include the 3 recent packets
+ in_window = tracker.get_packets_in_window(5.0)
+ assert len(in_window) == 3
+
+ def test_cleanup_old_packets(self):
+ """Test removing old packets."""
+ tracker = DeauthTracker()
+ now = time.time()
+
+ # Add old packet
+ tracker.add_packet(DeauthPacketInfo(
+ timestamp=now - 20,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ ))
+
+ # Add recent packet
+ tracker.add_packet(DeauthPacketInfo(
+ timestamp=now,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ ))
+
+ tracker.alert_sent = True
+
+ # Cleanup with 10-second window
+ tracker.cleanup_old_packets(10.0)
+
+ assert len(tracker.packets) == 1
+ assert tracker.packets[0].timestamp == now
+
+ def test_cleanup_resets_alert_sent(self):
+ """Test that cleanup resets alert_sent when all packets removed."""
+ tracker = DeauthTracker()
+ now = time.time()
+
+ tracker.add_packet(DeauthPacketInfo(
+ timestamp=now - 100, # Very old
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='AA:BB:CC:DD:EE:FF',
+ reason_code=7,
+ ))
+
+ tracker.alert_sent = True
+
+ # Cleanup should remove all packets
+ tracker.cleanup_old_packets(10.0)
+
+ assert len(tracker.packets) == 0
+ assert tracker.alert_sent is False
+
+
+class TestDeauthAlert:
+ """Tests for DeauthAlert."""
+
+ def test_to_dict(self):
+ """Test conversion to dictionary."""
+ alert = DeauthAlert(
+ id='deauth-123-1',
+ timestamp=1234567890.0,
+ severity='high',
+ attacker_mac='AA:BB:CC:DD:EE:FF',
+ attacker_vendor='Unknown',
+ attacker_signal_dbm=-45,
+ is_spoofed_ap=True,
+ target_mac='11:22:33:44:55:66',
+ target_vendor='Apple',
+ target_type='client',
+ target_known_from_scan=True,
+ ap_bssid='AA:BB:CC:DD:EE:FF',
+ ap_essid='TestNetwork',
+ ap_channel=6,
+ frame_type='deauth',
+ reason_code=7,
+ reason_text='Class 3 frame received from nonassociated STA',
+ packet_count=50,
+ window_seconds=5.0,
+ packets_per_second=10.0,
+ attack_type='targeted',
+ description='Targeted deauth flood against known client',
+ )
+
+ d = alert.to_dict()
+
+ assert d['id'] == 'deauth-123-1'
+ assert d['type'] == 'deauth_alert'
+ assert d['severity'] == 'high'
+ assert d['attacker']['mac'] == 'AA:BB:CC:DD:EE:FF'
+ assert d['attacker']['is_spoofed_ap'] is True
+ assert d['target']['type'] == 'client'
+ assert d['access_point']['essid'] == 'TestNetwork'
+ assert d['attack_info']['packet_count'] == 50
+ assert d['analysis']['attack_type'] == 'targeted'
+
+
+class TestDeauthDetector:
+ """Tests for DeauthDetector."""
+
+ def test_init(self):
+ """Test detector initialization."""
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ assert detector.interface == 'wlan0mon'
+ assert detector.event_callback == callback
+ assert not detector.is_running
+
+ def test_stats(self):
+ """Test stats property."""
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ stats = detector.stats
+ assert stats['is_running'] is False
+ assert stats['interface'] == 'wlan0mon'
+ assert stats['packets_captured'] == 0
+ assert stats['alerts_generated'] == 0
+
+ def test_get_alerts_empty(self):
+ """Test getting alerts when none exist."""
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ alerts = detector.get_alerts()
+ assert alerts == []
+
+ def test_clear_alerts(self):
+ """Test clearing alerts."""
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ # Add a mock alert
+ detector._alerts.append(MagicMock())
+ detector._trackers[('A', 'B', 'C')] = DeauthTracker()
+ detector._alert_counter = 5
+
+ detector.clear_alerts()
+
+ assert len(detector._alerts) == 0
+ assert len(detector._trackers) == 0
+ assert detector._alert_counter == 0
+
+ @patch('utils.wifi.deauth_detector.time.time')
+ def test_generate_alert_severity_low(self, mock_time):
+ """Test alert generation with low severity."""
+ mock_time.return_value = 1000.0
+
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ # Create packets just at threshold
+ packets = []
+ for i in range(DEAUTH_ALERT_THRESHOLD):
+ packets.append(DeauthPacketInfo(
+ timestamp=1000.0 - (DEAUTH_ALERT_THRESHOLD - 1 - i) * 0.1,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='99:88:77:66:55:44',
+ reason_code=7,
+ signal_dbm=-50,
+ ))
+
+ alert = detector._generate_alert(
+ tracker_key=('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44'),
+ packets=packets,
+ packet_count=DEAUTH_ALERT_THRESHOLD,
+ )
+
+ assert alert.severity == 'low'
+ assert alert.packet_count == DEAUTH_ALERT_THRESHOLD
+
+ @patch('utils.wifi.deauth_detector.time.time')
+ def test_generate_alert_severity_high(self, mock_time):
+ """Test alert generation with high severity."""
+ mock_time.return_value = 1000.0
+
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ # Create packets above critical threshold
+ packets = []
+ for i in range(DEAUTH_CRITICAL_THRESHOLD):
+ packets.append(DeauthPacketInfo(
+ timestamp=1000.0 - (DEAUTH_CRITICAL_THRESHOLD - 1 - i) * 0.1,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='99:88:77:66:55:44',
+ reason_code=7,
+ ))
+
+ alert = detector._generate_alert(
+ tracker_key=('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44'),
+ packets=packets,
+ packet_count=DEAUTH_CRITICAL_THRESHOLD,
+ )
+
+ assert alert.severity == 'high'
+
+ @patch('utils.wifi.deauth_detector.time.time')
+ def test_generate_alert_broadcast_attack(self, mock_time):
+ """Test alert classification for broadcast attack."""
+ mock_time.return_value = 1000.0
+
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ packets = [DeauthPacketInfo(
+ timestamp=999.9,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='FF:FF:FF:FF:FF:FF', # Broadcast
+ bssid='99:88:77:66:55:44',
+ reason_code=7,
+ )]
+
+ alert = detector._generate_alert(
+ tracker_key=('AA:BB:CC:DD:EE:FF', 'FF:FF:FF:FF:FF:FF', '99:88:77:66:55:44'),
+ packets=packets,
+ packet_count=10,
+ )
+
+ assert alert.attack_type == 'broadcast'
+ assert alert.target_type == 'broadcast'
+ assert 'all clients' in alert.description.lower()
+
+ def test_lookup_ap_no_callback(self):
+ """Test AP lookup when no callback is provided."""
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ get_networks=None,
+ )
+
+ result = detector._lookup_ap('AA:BB:CC:DD:EE:FF')
+
+ assert result['bssid'] == 'AA:BB:CC:DD:EE:FF'
+ assert result['essid'] is None
+ assert result['channel'] is None
+
+ def test_lookup_ap_with_callback(self):
+ """Test AP lookup with callback."""
+ callback = MagicMock()
+ get_networks = MagicMock(return_value={
+ 'AA:BB:CC:DD:EE:FF': {'essid': 'TestNet', 'channel': 6}
+ })
+
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ get_networks=get_networks,
+ )
+
+ result = detector._lookup_ap('AA:BB:CC:DD:EE:FF')
+
+ assert result['bssid'] == 'AA:BB:CC:DD:EE:FF'
+ assert result['essid'] == 'TestNet'
+ assert result['channel'] == 6
+
+ def test_check_spoofed_source(self):
+ """Test detection of spoofed AP source."""
+ callback = MagicMock()
+ get_networks = MagicMock(return_value={
+ 'AA:BB:CC:DD:EE:FF': {'essid': 'TestNet'}
+ })
+
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ get_networks=get_networks,
+ )
+
+ # Source matches known AP - spoofed
+ assert detector._check_spoofed_source('AA:BB:CC:DD:EE:FF') is True
+
+ # Source does not match any AP - not spoofed
+ assert detector._check_spoofed_source('11:22:33:44:55:66') is False
+
+ def test_cleanup_old_trackers(self):
+ """Test cleanup of old trackers."""
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ now = time.time()
+
+ # Add an old tracker
+ old_tracker = DeauthTracker()
+ old_tracker.add_packet(DeauthPacketInfo(
+ timestamp=now - 100, # Very old
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='99:88:77:66:55:44',
+ reason_code=7,
+ ))
+ detector._trackers[('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44')] = old_tracker
+
+ # Add a recent tracker
+ recent_tracker = DeauthTracker()
+ recent_tracker.add_packet(DeauthPacketInfo(
+ timestamp=now,
+ frame_type='deauth',
+ src_mac='BB:CC:DD:EE:FF:AA',
+ dst_mac='22:33:44:55:66:77',
+ bssid='88:77:66:55:44:33',
+ reason_code=7,
+ ))
+ detector._trackers[('BB:CC:DD:EE:FF:AA', '22:33:44:55:66:77', '88:77:66:55:44:33')] = recent_tracker
+
+ detector._cleanup_old_trackers()
+
+ # Old tracker should be removed
+ assert ('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44') not in detector._trackers
+ # Recent tracker should remain
+ assert ('BB:CC:DD:EE:FF:AA', '22:33:44:55:66:77', '88:77:66:55:44:33') in detector._trackers
+
+
+class TestReasonCodes:
+ """Tests for reason code dictionary."""
+
+ def test_common_reason_codes(self):
+ """Test that common reason codes are defined."""
+ assert 1 in DEAUTH_REASON_CODES # Unspecified
+ assert 7 in DEAUTH_REASON_CODES # Class 3 frame
+ assert 14 in DEAUTH_REASON_CODES # MIC failure
+
+ def test_reason_code_descriptions(self):
+ """Test reason code descriptions are strings."""
+ for code, desc in DEAUTH_REASON_CODES.items():
+ assert isinstance(code, int)
+ assert isinstance(desc, str)
+ assert len(desc) > 0
+
+
+class TestDeauthDetectorIntegration:
+ """Integration tests for DeauthDetector with mocked scapy."""
+
+ @patch('utils.wifi.deauth_detector.time.time')
+ def test_process_deauth_packet_generates_alert(self, mock_time):
+ """Test that processing packets generates alert when threshold exceeded."""
+ mock_time.return_value = 1000.0
+
+ callback = MagicMock()
+ detector = DeauthDetector(
+ interface='wlan0mon',
+ event_callback=callback,
+ )
+
+ # Create a mock scapy packet
+ mock_pkt = MagicMock()
+
+ # Mock Dot11Deauth layer
+ mock_deauth = MagicMock()
+ mock_deauth.reason = 7
+
+ # Mock Dot11 layer
+ mock_dot11 = MagicMock()
+ mock_dot11.addr1 = '11:22:33:44:55:66' # dst
+ mock_dot11.addr2 = 'AA:BB:CC:DD:EE:FF' # src
+ mock_dot11.addr3 = '99:88:77:66:55:44' # bssid
+
+ # Mock RadioTap layer
+ mock_radiotap = MagicMock()
+ mock_radiotap.dBm_AntSignal = -50
+
+ # Set up haslayer behavior
+ def haslayer_side_effect(layer):
+ if 'Dot11Deauth' in str(layer):
+ return True
+ if 'Dot11Disas' in str(layer):
+ return False
+ if 'RadioTap' in str(layer):
+ return True
+ return False
+
+ mock_pkt.haslayer = haslayer_side_effect
+
+ # Set up __getitem__ behavior
+ def getitem_side_effect(layer):
+ if 'Dot11Deauth' in str(layer):
+ return mock_deauth
+ if 'Dot11' in str(layer) and 'Deauth' not in str(layer):
+ return mock_dot11
+ if 'RadioTap' in str(layer):
+ return mock_radiotap
+ return MagicMock()
+
+ mock_pkt.__getitem__ = getitem_side_effect
+
+ # Patch the scapy imports inside _process_deauth_packet
+ with patch('utils.wifi.deauth_detector.DeauthDetector._process_deauth_packet.__globals__', {
+ 'Dot11': MagicMock,
+ 'Dot11Deauth': MagicMock,
+ 'Dot11Disas': MagicMock,
+ 'RadioTap': MagicMock,
+ }):
+ # Process enough packets to trigger alert
+ for i in range(DEAUTH_ALERT_THRESHOLD + 5):
+ mock_time.return_value = 1000.0 + i * 0.1
+
+ # Manually simulate what _process_deauth_packet does
+ pkt_info = DeauthPacketInfo(
+ timestamp=mock_time.return_value,
+ frame_type='deauth',
+ src_mac='AA:BB:CC:DD:EE:FF',
+ dst_mac='11:22:33:44:55:66',
+ bssid='99:88:77:66:55:44',
+ reason_code=7,
+ signal_dbm=-50,
+ )
+
+ detector._packets_captured += 1
+
+ tracker_key = ('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44')
+ tracker = detector._trackers[tracker_key]
+ tracker.add_packet(pkt_info)
+
+ packets_in_window = tracker.get_packets_in_window(DEAUTH_DETECTION_WINDOW)
+ packet_count = len(packets_in_window)
+
+ if packet_count >= DEAUTH_ALERT_THRESHOLD and not tracker.alert_sent:
+ alert = detector._generate_alert(
+ tracker_key=tracker_key,
+ packets=packets_in_window,
+ packet_count=packet_count,
+ )
+ detector._alerts.append(alert)
+ detector._alerts_generated += 1
+ tracker.alert_sent = True
+ detector.event_callback(alert.to_dict())
+
+ # Verify alert was generated
+ assert detector._alerts_generated == 1
+ assert len(detector._alerts) == 1
+ assert callback.called
+
+ # Verify callback was called with alert data
+ call_args = callback.call_args[0][0]
+ assert call_args['type'] == 'deauth_alert'
+ assert call_args['attacker']['mac'] == 'AA:BB:CC:DD:EE:FF'
+ assert call_args['target']['mac'] == '11:22:33:44:55:66'
diff --git a/utils/constants.py b/utils/constants.py
index 43cf29d..f252938 100644
--- a/utils/constants.py
+++ b/utils/constants.py
@@ -254,3 +254,23 @@ MAX_DSC_MESSAGE_AGE_SECONDS = 3600 # 1 hour
# DSC process termination timeout
DSC_TERMINATE_TIMEOUT = 3
+
+
+# =============================================================================
+# DEAUTH ATTACK DETECTION
+# =============================================================================
+
+# Time window for grouping deauth packets (seconds)
+DEAUTH_DETECTION_WINDOW = 5
+
+# Number of deauth packets in window to trigger alert
+DEAUTH_ALERT_THRESHOLD = 10
+
+# Number of deauth packets in window for critical severity
+DEAUTH_CRITICAL_THRESHOLD = 50
+
+# Maximum age for deauth alerts in DataStore (seconds)
+MAX_DEAUTH_ALERTS_AGE_SECONDS = 300 # 5 minutes
+
+# Deauth detector sniff timeout (seconds)
+DEAUTH_SNIFF_TIMEOUT = 0.5
diff --git a/utils/wifi/deauth_detector.py b/utils/wifi/deauth_detector.py
new file mode 100644
index 0000000..66e16e5
--- /dev/null
+++ b/utils/wifi/deauth_detector.py
@@ -0,0 +1,616 @@
+"""
+Deauthentication attack detector using scapy.
+
+Monitors a WiFi interface in monitor mode for deauthentication and disassociation
+frames, detecting potential deauth flood attacks.
+"""
+
+from __future__ import annotations
+
+import logging
+import threading
+import time
+from collections import defaultdict
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Callable, Optional, Any
+
+from utils.constants import (
+ DEAUTH_DETECTION_WINDOW,
+ DEAUTH_ALERT_THRESHOLD,
+ DEAUTH_CRITICAL_THRESHOLD,
+ DEAUTH_SNIFF_TIMEOUT,
+)
+
+logger = logging.getLogger(__name__)
+
+# Deauth reason code descriptions
+DEAUTH_REASON_CODES = {
+ 0: "Reserved",
+ 1: "Unspecified reason",
+ 2: "Previous authentication no longer valid",
+ 3: "Station is leaving (or has left) IBSS or ESS",
+ 4: "Disassociated due to inactivity",
+ 5: "Disassociated because AP is unable to handle all currently associated STAs",
+ 6: "Class 2 frame received from nonauthenticated STA",
+ 7: "Class 3 frame received from nonassociated STA",
+ 8: "Disassociated because sending STA is leaving (or has left) BSS",
+ 9: "STA requesting (re)association is not authenticated with responding STA",
+ 10: "Disassociated because the information in the Power Capability element is unacceptable",
+ 11: "Disassociated because the information in the Supported Channels element is unacceptable",
+ 12: "Disassociated due to BSS Transition Management",
+ 13: "Invalid information element",
+ 14: "MIC failure",
+ 15: "4-Way Handshake timeout",
+ 16: "Group Key Handshake timeout",
+ 17: "Information element in 4-Way Handshake different from (Re)Association Request/Probe Response/Beacon frame",
+ 18: "Invalid group cipher",
+ 19: "Invalid pairwise cipher",
+ 20: "Invalid AKMP",
+ 21: "Unsupported RSNE version",
+ 22: "Invalid RSNE capabilities",
+ 23: "IEEE 802.1X authentication failed",
+ 24: "Cipher suite rejected because of security policy",
+}
+
+
+@dataclass
+class DeauthPacketInfo:
+ """Information about a captured deauth/disassoc packet."""
+ timestamp: float
+ frame_type: str # 'deauth' or 'disassoc'
+ src_mac: str
+ dst_mac: str
+ bssid: str
+ reason_code: int
+ signal_dbm: Optional[int] = None
+
+
+@dataclass
+class DeauthTracker:
+ """Tracks deauth packets for a specific source/dest/bssid combination."""
+ packets: list[DeauthPacketInfo] = field(default_factory=list)
+ first_seen: float = 0.0
+ last_seen: float = 0.0
+ alert_sent: bool = False
+
+ def add_packet(self, pkt: DeauthPacketInfo):
+ self.packets.append(pkt)
+ now = pkt.timestamp
+ if self.first_seen == 0.0:
+ self.first_seen = now
+ self.last_seen = now
+
+ def get_packets_in_window(self, window_seconds: float) -> list[DeauthPacketInfo]:
+ """Get packets within the time window."""
+ cutoff = time.time() - window_seconds
+ return [p for p in self.packets if p.timestamp >= cutoff]
+
+ def cleanup_old_packets(self, window_seconds: float):
+ """Remove packets older than the window."""
+ cutoff = time.time() - window_seconds
+ self.packets = [p for p in self.packets if p.timestamp >= cutoff]
+ if self.packets:
+ self.first_seen = self.packets[0].timestamp
+ else:
+ self.first_seen = 0.0
+ self.alert_sent = False
+
+
+@dataclass
+class DeauthAlert:
+ """A deauthentication attack alert."""
+ id: str
+ timestamp: float
+ severity: str # 'low', 'medium', 'high'
+
+ # Attacker info
+ attacker_mac: str
+ attacker_vendor: Optional[str]
+ attacker_signal_dbm: Optional[int]
+ is_spoofed_ap: bool
+
+ # Target info
+ target_mac: str
+ target_vendor: Optional[str]
+ target_type: str # 'client', 'broadcast', 'ap'
+ target_known_from_scan: bool
+
+ # Access point info
+ ap_bssid: str
+ ap_essid: Optional[str]
+ ap_channel: Optional[int]
+
+ # Attack info
+ frame_type: str
+ reason_code: int
+ reason_text: str
+ packet_count: int
+ window_seconds: float
+ packets_per_second: float
+
+ # Analysis
+ attack_type: str # 'targeted', 'broadcast', 'ap_flood'
+ description: str
+
+ def to_dict(self) -> dict:
+ """Convert to dictionary for JSON serialization."""
+ return {
+ 'id': self.id,
+ 'type': 'deauth_alert',
+ 'timestamp': self.timestamp,
+ 'severity': self.severity,
+ 'attacker': {
+ 'mac': self.attacker_mac,
+ 'vendor': self.attacker_vendor,
+ 'signal_dbm': self.attacker_signal_dbm,
+ 'is_spoofed_ap': self.is_spoofed_ap,
+ },
+ 'target': {
+ 'mac': self.target_mac,
+ 'vendor': self.target_vendor,
+ 'type': self.target_type,
+ 'known_from_scan': self.target_known_from_scan,
+ },
+ 'access_point': {
+ 'bssid': self.ap_bssid,
+ 'essid': self.ap_essid,
+ 'channel': self.ap_channel,
+ },
+ 'attack_info': {
+ 'frame_type': self.frame_type,
+ 'reason_code': self.reason_code,
+ 'reason_text': self.reason_text,
+ 'packet_count': self.packet_count,
+ 'window_seconds': self.window_seconds,
+ 'packets_per_second': self.packets_per_second,
+ },
+ 'analysis': {
+ 'attack_type': self.attack_type,
+ 'description': self.description,
+ },
+ }
+
+
+class DeauthDetector:
+ """
+ Detects deauthentication attacks using scapy.
+
+ Monitors a WiFi interface in monitor mode for deauth/disassoc frames
+ and emits alerts when attack thresholds are exceeded.
+ """
+
+ def __init__(
+ self,
+ interface: str,
+ event_callback: Callable[[dict], None],
+ get_networks: Optional[Callable[[], dict[str, Any]]] = None,
+ get_clients: Optional[Callable[[], dict[str, Any]]] = None,
+ ):
+ """
+ Initialize the deauth detector.
+
+ Args:
+ interface: Monitor mode interface to sniff on
+ event_callback: Callback function to receive alert events
+ get_networks: Optional function to get current WiFi networks (bssid -> network_info)
+ get_clients: Optional function to get current WiFi clients (mac -> client_info)
+ """
+ self.interface = interface
+ self.event_callback = event_callback
+ self.get_networks = get_networks
+ self.get_clients = get_clients
+
+ self._stop_event = threading.Event()
+ self._thread: Optional[threading.Thread] = None
+ self._lock = threading.Lock()
+
+ # Track deauth packets by (src, dst, bssid) tuple
+ self._trackers: dict[tuple[str, str, str], DeauthTracker] = defaultdict(DeauthTracker)
+
+ # Alert history
+ self._alerts: list[DeauthAlert] = []
+ self._alert_counter = 0
+
+ # Stats
+ self._packets_captured = 0
+ self._alerts_generated = 0
+ self._started_at: Optional[float] = None
+
+ @property
+ def is_running(self) -> bool:
+ """Check if detector is running."""
+ return self._thread is not None and self._thread.is_alive()
+
+ @property
+ def stats(self) -> dict:
+ """Get detector statistics."""
+ return {
+ 'is_running': self.is_running,
+ 'interface': self.interface,
+ 'started_at': self._started_at,
+ 'packets_captured': self._packets_captured,
+ 'alerts_generated': self._alerts_generated,
+ 'active_trackers': len(self._trackers),
+ }
+
+ def start(self) -> bool:
+ """
+ Start detection in background thread.
+
+ Returns:
+ True if started successfully.
+ """
+ if self.is_running:
+ logger.warning("Deauth detector already running")
+ return True
+
+ self._stop_event.clear()
+ self._started_at = time.time()
+
+ self._thread = threading.Thread(
+ target=self._sniff_loop,
+ name="DeauthDetector",
+ daemon=True,
+ )
+ self._thread.start()
+
+ logger.info(f"Deauth detector started on {self.interface}")
+ return True
+
+ def stop(self) -> bool:
+ """
+ Stop detection.
+
+ Returns:
+ True if stopped successfully.
+ """
+ if not self.is_running:
+ return True
+
+ logger.info("Stopping deauth detector...")
+ self._stop_event.set()
+
+ if self._thread:
+ self._thread.join(timeout=5)
+ if self._thread.is_alive():
+ logger.warning("Deauth detector thread did not stop cleanly")
+ self._thread = None
+
+ self._started_at = None
+ logger.info("Deauth detector stopped")
+ return True
+
+ def get_alerts(self, limit: int = 100) -> list[dict]:
+ """Get recent alerts."""
+ with self._lock:
+ return [a.to_dict() for a in self._alerts[-limit:]]
+
+ def clear_alerts(self):
+ """Clear alert history."""
+ with self._lock:
+ self._alerts.clear()
+ self._trackers.clear()
+ self._alert_counter = 0
+
+ def _sniff_loop(self):
+ """Main sniffing loop using scapy."""
+ try:
+ from scapy.all import sniff, Dot11, Dot11Deauth, Dot11Disas
+ except ImportError:
+ logger.error("scapy not installed. Install with: pip install scapy")
+ self.event_callback({
+ 'type': 'deauth_error',
+ 'error': 'scapy not installed',
+ })
+ return
+
+ logger.info(f"Starting deauth sniff on {self.interface}")
+
+ def packet_handler(pkt):
+ """Handle each captured packet."""
+ if self._stop_event.is_set():
+ return
+
+ # Check for deauth or disassoc frames
+ if pkt.haslayer(Dot11Deauth) or pkt.haslayer(Dot11Disas):
+ self._process_deauth_packet(pkt)
+
+ try:
+ # Use stop_filter to allow clean shutdown
+ sniff(
+ iface=self.interface,
+ prn=packet_handler,
+ store=False,
+ stop_filter=lambda _: self._stop_event.is_set(),
+ timeout=DEAUTH_SNIFF_TIMEOUT,
+ )
+
+ # Continue sniffing until stop is requested
+ while not self._stop_event.is_set():
+ sniff(
+ iface=self.interface,
+ prn=packet_handler,
+ store=False,
+ stop_filter=lambda _: self._stop_event.is_set(),
+ timeout=DEAUTH_SNIFF_TIMEOUT,
+ )
+ # Periodic cleanup
+ self._cleanup_old_trackers()
+
+ except OSError as e:
+ if "No such device" in str(e):
+ logger.error(f"Interface {self.interface} not found")
+ self.event_callback({
+ 'type': 'deauth_error',
+ 'error': f'Interface {self.interface} not found',
+ })
+ else:
+ logger.exception(f"Sniff error: {e}")
+ self.event_callback({
+ 'type': 'deauth_error',
+ 'error': str(e),
+ })
+ except Exception as e:
+ logger.exception(f"Sniff error: {e}")
+ self.event_callback({
+ 'type': 'deauth_error',
+ 'error': str(e),
+ })
+
+ def _process_deauth_packet(self, pkt):
+ """Process a deauth/disassoc packet and emit alert if threshold exceeded."""
+ try:
+ from scapy.all import Dot11, Dot11Deauth, Dot11Disas, RadioTap
+ except ImportError:
+ return
+
+ # Determine frame type
+ if pkt.haslayer(Dot11Deauth):
+ frame_type = 'deauth'
+ reason_code = pkt[Dot11Deauth].reason
+ elif pkt.haslayer(Dot11Disas):
+ frame_type = 'disassoc'
+ reason_code = pkt[Dot11Disas].reason
+ else:
+ return
+
+ # Extract addresses from Dot11 layer
+ dot11 = pkt[Dot11]
+ dst_mac = (dot11.addr1 or '').upper()
+ src_mac = (dot11.addr2 or '').upper()
+ bssid = (dot11.addr3 or '').upper()
+
+ # Skip if addresses are missing
+ if not src_mac or not dst_mac:
+ return
+
+ # Extract signal strength from RadioTap if available
+ signal_dbm = None
+ if pkt.haslayer(RadioTap):
+ try:
+ signal_dbm = pkt[RadioTap].dBm_AntSignal
+ except AttributeError:
+ pass
+
+ # Create packet info
+ pkt_info = DeauthPacketInfo(
+ timestamp=time.time(),
+ frame_type=frame_type,
+ src_mac=src_mac,
+ dst_mac=dst_mac,
+ bssid=bssid,
+ reason_code=reason_code,
+ signal_dbm=signal_dbm,
+ )
+
+ self._packets_captured += 1
+
+ # Track packet
+ tracker_key = (src_mac, dst_mac, bssid)
+ with self._lock:
+ tracker = self._trackers[tracker_key]
+ tracker.add_packet(pkt_info)
+
+ # Check if threshold exceeded
+ packets_in_window = tracker.get_packets_in_window(DEAUTH_DETECTION_WINDOW)
+ packet_count = len(packets_in_window)
+
+ if packet_count >= DEAUTH_ALERT_THRESHOLD and not tracker.alert_sent:
+ # Generate alert
+ alert = self._generate_alert(
+ tracker_key=tracker_key,
+ packets=packets_in_window,
+ packet_count=packet_count,
+ )
+
+ self._alerts.append(alert)
+ self._alerts_generated += 1
+ tracker.alert_sent = True
+
+ # Emit event
+ self.event_callback(alert.to_dict())
+
+ logger.warning(
+ f"Deauth attack detected: {src_mac} -> {dst_mac} "
+ f"({packet_count} packets in {DEAUTH_DETECTION_WINDOW}s)"
+ )
+
+ def _generate_alert(
+ self,
+ tracker_key: tuple[str, str, str],
+ packets: list[DeauthPacketInfo],
+ packet_count: int,
+ ) -> DeauthAlert:
+ """Generate an alert from tracked packets."""
+ src_mac, dst_mac, bssid = tracker_key
+
+ # Get latest packet for details
+ latest_pkt = packets[-1] if packets else None
+
+ # Determine severity
+ if packet_count >= DEAUTH_CRITICAL_THRESHOLD:
+ severity = 'high'
+ elif packet_count >= DEAUTH_ALERT_THRESHOLD * 2.5:
+ severity = 'medium'
+ else:
+ severity = 'low'
+
+ # Lookup AP info
+ ap_info = self._lookup_ap(bssid)
+
+ # Lookup target info
+ target_info = self._lookup_device(dst_mac)
+
+ # Determine target type
+ if dst_mac == 'FF:FF:FF:FF:FF:FF':
+ target_type = 'broadcast'
+ elif dst_mac in self._get_known_aps():
+ target_type = 'ap'
+ else:
+ target_type = 'client'
+
+ # Check if source is spoofed (matches known AP)
+ is_spoofed = self._check_spoofed_source(src_mac)
+
+ # Get attacker vendor
+ attacker_vendor = self._get_vendor(src_mac)
+
+ # Calculate packets per second
+ if packets:
+ time_span = packets[-1].timestamp - packets[0].timestamp
+ pps = packet_count / time_span if time_span > 0 else float(packet_count)
+ else:
+ pps = 0.0
+
+ # Determine attack type and description
+ if dst_mac == 'FF:FF:FF:FF:FF:FF':
+ attack_type = 'broadcast'
+ description = "Broadcast deauth flood targeting all clients on the network"
+ elif target_type == 'ap':
+ attack_type = 'ap_flood'
+ description = "Deauth flood targeting access point"
+ else:
+ attack_type = 'targeted'
+ description = f"Targeted deauth flood against {'known' if target_info.get('known_from_scan') else 'unknown'} client"
+
+ # Get reason code info
+ reason_code = latest_pkt.reason_code if latest_pkt else 0
+ reason_text = DEAUTH_REASON_CODES.get(reason_code, f"Unknown ({reason_code})")
+
+ # Get signal
+ signal_dbm = None
+ for pkt in reversed(packets):
+ if pkt.signal_dbm is not None:
+ signal_dbm = pkt.signal_dbm
+ break
+
+ # Generate unique ID
+ self._alert_counter += 1
+ alert_id = f"deauth-{int(time.time())}-{self._alert_counter}"
+
+ return DeauthAlert(
+ id=alert_id,
+ timestamp=time.time(),
+ severity=severity,
+ attacker_mac=src_mac,
+ attacker_vendor=attacker_vendor,
+ attacker_signal_dbm=signal_dbm,
+ is_spoofed_ap=is_spoofed,
+ target_mac=dst_mac,
+ target_vendor=target_info.get('vendor'),
+ target_type=target_type,
+ target_known_from_scan=target_info.get('known_from_scan', False),
+ ap_bssid=bssid,
+ ap_essid=ap_info.get('essid'),
+ ap_channel=ap_info.get('channel'),
+ frame_type=latest_pkt.frame_type if latest_pkt else 'deauth',
+ reason_code=reason_code,
+ reason_text=reason_text,
+ packet_count=packet_count,
+ window_seconds=DEAUTH_DETECTION_WINDOW,
+ packets_per_second=round(pps, 1),
+ attack_type=attack_type,
+ description=description,
+ )
+
+ def _lookup_ap(self, bssid: str) -> dict:
+ """Get AP info from current scan data."""
+ if not self.get_networks:
+ return {'bssid': bssid, 'essid': None, 'channel': None}
+
+ try:
+ networks = self.get_networks()
+ ap = networks.get(bssid.upper())
+ if ap:
+ return {
+ 'bssid': bssid,
+ 'essid': ap.get('essid') or ap.get('ssid'),
+ 'channel': ap.get('channel'),
+ }
+ except Exception as e:
+ logger.debug(f"Error looking up AP {bssid}: {e}")
+
+ return {'bssid': bssid, 'essid': None, 'channel': None}
+
+ def _lookup_device(self, mac: str) -> dict:
+ """Get device info and vendor from MAC."""
+ vendor = self._get_vendor(mac)
+ known_from_scan = False
+
+ if self.get_clients:
+ try:
+ clients = self.get_clients()
+ if mac.upper() in clients:
+ known_from_scan = True
+ except Exception:
+ pass
+
+ return {
+ 'mac': mac,
+ 'vendor': vendor,
+ 'known_from_scan': known_from_scan,
+ }
+
+ def _get_known_aps(self) -> set[str]:
+ """Get set of known AP BSSIDs."""
+ if not self.get_networks:
+ return set()
+
+ try:
+ networks = self.get_networks()
+ return {bssid.upper() for bssid in networks.keys()}
+ except Exception:
+ return set()
+
+ def _check_spoofed_source(self, src_mac: str) -> bool:
+ """Check if source MAC matches a known AP (spoofing indicator)."""
+ return src_mac.upper() in self._get_known_aps()
+
+ def _get_vendor(self, mac: str) -> Optional[str]:
+ """Get vendor from MAC OUI."""
+ try:
+ from data.oui import get_manufacturer
+ vendor = get_manufacturer(mac)
+ return vendor if vendor != 'Unknown' else None
+ except Exception:
+ pass
+
+ # Fallback to wifi constants
+ try:
+ from utils.wifi.constants import get_vendor_from_mac
+ return get_vendor_from_mac(mac)
+ except Exception:
+ return None
+
+ def _cleanup_old_trackers(self):
+ """Remove old packets and empty trackers."""
+ with self._lock:
+ keys_to_remove = []
+ for key, tracker in self._trackers.items():
+ tracker.cleanup_old_packets(DEAUTH_DETECTION_WINDOW * 2)
+ if not tracker.packets:
+ keys_to_remove.append(key)
+
+ for key in keys_to_remove:
+ del self._trackers[key]
diff --git a/utils/wifi/scanner.py b/utils/wifi/scanner.py
index a84339f..94588e7 100644
--- a/utils/wifi/scanner.py
+++ b/utils/wifi/scanner.py
@@ -19,7 +19,10 @@ import threading
import time
from datetime import datetime
from pathlib import Path
-from typing import Callable, Generator, Optional
+from typing import Callable, Generator, Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .deauth_detector import DeauthDetector
from .constants import (
DEFAULT_QUICK_SCAN_TIMEOUT,
@@ -87,6 +90,9 @@ class UnifiedWiFiScanner:
self._deep_scan_thread: Optional[threading.Thread] = None
self._deep_scan_stop_event = threading.Event()
+ # Deauth detector
+ self._deauth_detector: Optional['DeauthDetector'] = None
+
# Event queue for SSE streaming
self._event_queue: queue.Queue = queue.Queue(maxsize=1000)
@@ -623,6 +629,9 @@ class UnifiedWiFiScanner:
'interface': iface,
})
+ # Auto-start deauth detector
+ self._start_deauth_detector(iface)
+
return True
def stop_deep_scan(self) -> bool:
@@ -636,6 +645,9 @@ class UnifiedWiFiScanner:
if not self._status.is_scanning:
return True
+ # Stop deauth detector first
+ self._stop_deauth_detector()
+
self._deep_scan_stop_event.set()
if self._deep_scan_process:
@@ -1148,6 +1160,107 @@ class UnifiedWiFiScanner:
with self._lock:
return [ap.to_legacy_dict() for ap in self._access_points.values()]
+ # =========================================================================
+ # Deauth Detection Integration
+ # =========================================================================
+
+ def _start_deauth_detector(self, interface: str):
+ """Start deauth detector on the given interface."""
+ try:
+ from .deauth_detector import DeauthDetector
+ except ImportError as e:
+ logger.warning(f"Could not import DeauthDetector (scapy not installed?): {e}")
+ return
+
+ if self._deauth_detector and self._deauth_detector.is_running:
+ logger.debug("Deauth detector already running")
+ return
+
+ def event_callback(event: dict):
+ """Handle deauth events and forward to queue."""
+ self._queue_event(event)
+ # Also store in app-level DataStore if available
+ try:
+ import app as app_module
+ if hasattr(app_module, 'deauth_alerts') and event.get('type') == 'deauth_alert':
+ alert_id = event.get('id', str(time.time()))
+ app_module.deauth_alerts[alert_id] = event
+ if hasattr(app_module, 'deauth_detector_queue'):
+ try:
+ app_module.deauth_detector_queue.put_nowait(event)
+ except queue.Full:
+ pass
+ except Exception as e:
+ logger.debug(f"Error storing deauth alert: {e}")
+
+ def get_networks() -> dict:
+ """Get current networks for cross-reference."""
+ with self._lock:
+ return {bssid: ap.to_summary_dict() for bssid, ap in self._access_points.items()}
+
+ def get_clients() -> dict:
+ """Get current clients for cross-reference."""
+ with self._lock:
+ return {mac: client.to_dict() for mac, client in self._clients.items()}
+
+ try:
+ self._deauth_detector = DeauthDetector(
+ interface=interface,
+ event_callback=event_callback,
+ get_networks=get_networks,
+ get_clients=get_clients,
+ )
+ self._deauth_detector.start()
+ logger.info(f"Deauth detector started on {interface}")
+
+ self._queue_event({
+ 'type': 'deauth_detector_started',
+ 'interface': interface,
+ })
+ except Exception as e:
+ logger.error(f"Failed to start deauth detector: {e}")
+ self._queue_event({
+ 'type': 'deauth_error',
+ 'error': f"Failed to start deauth detector: {e}",
+ })
+
+ def _stop_deauth_detector(self):
+ """Stop the deauth detector."""
+ if self._deauth_detector:
+ try:
+ self._deauth_detector.stop()
+ logger.info("Deauth detector stopped")
+ self._queue_event({
+ 'type': 'deauth_detector_stopped',
+ })
+ except Exception as e:
+ logger.error(f"Error stopping deauth detector: {e}")
+ finally:
+ self._deauth_detector = None
+
+ @property
+ def deauth_detector(self) -> Optional['DeauthDetector']:
+ """Get the deauth detector instance."""
+ return self._deauth_detector
+
+ def get_deauth_alerts(self, limit: int = 100) -> list[dict]:
+ """Get recent deauth alerts."""
+ if self._deauth_detector:
+ return self._deauth_detector.get_alerts(limit)
+ return []
+
+ def clear_deauth_alerts(self):
+ """Clear deauth alert history."""
+ if self._deauth_detector:
+ self._deauth_detector.clear_alerts()
+ # Also clear from app-level store
+ try:
+ import app as app_module
+ if hasattr(app_module, 'deauth_alerts'):
+ app_module.deauth_alerts.clear()
+ except Exception:
+ pass
+
# =============================================================================
# Module-level functions