Release v2.12.1

Bug fixes and improvements.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-02 22:16:12 +00:00
parent d1f1ce1f4b
commit f795180c7d
13 changed files with 1573 additions and 19 deletions

10
app.py
View File

@@ -38,6 +38,7 @@ from utils.constants import (
MAX_BT_DEVICE_AGE_SECONDS,
MAX_VESSEL_AGE_SECONDS,
MAX_DSC_MESSAGE_AGE_SECONDS,
MAX_DEAUTH_ALERTS_AGE_SECONDS,
QUEUE_MAX_SIZE,
)
import logging
@@ -175,6 +176,11 @@ dsc_lock = threading.Lock()
tscm_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
tscm_lock = threading.Lock()
# Deauth Attack Detection
deauth_detector = None
deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
deauth_detector_lock = threading.Lock()
# ============================================
# GLOBAL STATE DICTIONARIES
# ============================================
@@ -204,6 +210,9 @@ ais_vessels = DataStore(max_age_seconds=MAX_VESSEL_AGE_SECONDS, name='ais_vessel
# DSC (Digital Selective Calling) state - using DataStore for automatic cleanup
dsc_messages = DataStore(max_age_seconds=MAX_DSC_MESSAGE_AGE_SECONDS, name='dsc_messages')
# Deauth alerts - using DataStore for automatic cleanup
deauth_alerts = DataStore(max_age_seconds=MAX_DEAUTH_ALERTS_AGE_SECONDS, name='deauth_alerts')
# Satellite state
satellite_passes = [] # Predicted satellite passes (not auto-cleaned, calculated)
@@ -215,6 +224,7 @@ cleanup_manager.register(bt_beacons)
cleanup_manager.register(adsb_aircraft)
cleanup_manager.register(ais_vessels)
cleanup_manager.register(dsc_messages)
cleanup_manager.register(deauth_alerts)
# ============================================
# SDR DEVICE REGISTRY

View File

@@ -7,10 +7,17 @@ import os
import sys
# Application version
VERSION = "2.12.0"
VERSION = "2.12.1"
# Changelog - latest release notes (shown on welcome screen)
CHANGELOG = [
{
"version": "2.12.1",
"date": "February 2026",
"highlights": [
"Bug fixes and improvements",
]
},
{
"version": "2.12.0",
"date": "January 2026",
@@ -136,27 +143,27 @@ AIRODUMP_HEADER_LINES = _get_env_int('AIRODUMP_HEADER_LINES', 2)
BT_SCAN_TIMEOUT = _get_env_int('BT_SCAN_TIMEOUT', 10)
BT_UPDATE_INTERVAL = _get_env_float('BT_UPDATE_INTERVAL', 2.0)
# ADS-B settings
ADSB_SBS_PORT = _get_env_int('ADSB_SBS_PORT', 30003)
ADSB_UPDATE_INTERVAL = _get_env_float('ADSB_UPDATE_INTERVAL', 1.0)
ADSB_AUTO_START = _get_env_bool('ADSB_AUTO_START', False)
ADSB_HISTORY_ENABLED = _get_env_bool('ADSB_HISTORY_ENABLED', False)
# ADS-B settings
ADSB_SBS_PORT = _get_env_int('ADSB_SBS_PORT', 30003)
ADSB_UPDATE_INTERVAL = _get_env_float('ADSB_UPDATE_INTERVAL', 1.0)
ADSB_AUTO_START = _get_env_bool('ADSB_AUTO_START', False)
ADSB_HISTORY_ENABLED = _get_env_bool('ADSB_HISTORY_ENABLED', False)
ADSB_DB_HOST = _get_env('ADSB_DB_HOST', 'localhost')
ADSB_DB_PORT = _get_env_int('ADSB_DB_PORT', 5432)
ADSB_DB_NAME = _get_env('ADSB_DB_NAME', 'intercept_adsb')
ADSB_DB_USER = _get_env('ADSB_DB_USER', 'intercept')
ADSB_DB_PASSWORD = _get_env('ADSB_DB_PASSWORD', 'intercept')
ADSB_HISTORY_BATCH_SIZE = _get_env_int('ADSB_HISTORY_BATCH_SIZE', 500)
ADSB_HISTORY_FLUSH_INTERVAL = _get_env_float('ADSB_HISTORY_FLUSH_INTERVAL', 1.0)
ADSB_HISTORY_QUEUE_SIZE = _get_env_int('ADSB_HISTORY_QUEUE_SIZE', 50000)
# Observer location settings
SHARED_OBSERVER_LOCATION_ENABLED = _get_env_bool('SHARED_OBSERVER_LOCATION', True)
# Satellite settings
SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
ADSB_HISTORY_BATCH_SIZE = _get_env_int('ADSB_HISTORY_BATCH_SIZE', 500)
ADSB_HISTORY_FLUSH_INTERVAL = _get_env_float('ADSB_HISTORY_FLUSH_INTERVAL', 1.0)
ADSB_HISTORY_QUEUE_SIZE = _get_env_int('ADSB_HISTORY_QUEUE_SIZE', 50000)
# Observer location settings
SHARED_OBSERVER_LOCATION_ENABLED = _get_env_bool('SHARED_OBSERVER_LOCATION', True)
# Satellite settings
SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
# Update checking
GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')

View File

@@ -1,6 +1,6 @@
[project]
name = "intercept"
version = "2.12.0"
version = "2.12.1"
description = "Signal Intelligence Platform - Pager/433MHz/ADS-B/Satellite/WiFi/Bluetooth"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -23,6 +23,9 @@ pyserial>=3.5
# Meshtastic mesh network support (optional - only needed for Meshtastic features)
meshtastic>=2.0.0
# Deauthentication attack detection (optional - for WiFi TSCM)
scapy>=2.4.5
# QR code generation for Meshtastic channels (optional)
qrcode[pil]>=7.4

View File

@@ -1413,3 +1413,143 @@ def v2_clear_data():
except Exception as e:
logger.exception("Error clearing data")
return jsonify({'error': str(e)}), 500
# =============================================================================
# V2 Deauth Detection Endpoints
# =============================================================================
@wifi_bp.route('/v2/deauth/status')
def v2_deauth_status():
"""
Get deauth detection status and recent alerts.
Returns:
- is_running: Whether deauth detector is active
- interface: Monitor interface being used
- stats: Detection statistics
- recent_alerts: Recent deauth alerts
"""
try:
scanner = get_wifi_scanner()
detector = scanner.deauth_detector
if detector:
stats = detector.stats
alerts = detector.get_alerts(limit=50)
else:
stats = {
'is_running': False,
'interface': None,
'packets_captured': 0,
'alerts_generated': 0,
}
alerts = []
return jsonify({
'is_running': stats.get('is_running', False),
'interface': stats.get('interface'),
'started_at': stats.get('started_at'),
'stats': {
'packets_captured': stats.get('packets_captured', 0),
'alerts_generated': stats.get('alerts_generated', 0),
'active_trackers': stats.get('active_trackers', 0),
},
'recent_alerts': alerts,
})
except Exception as e:
logger.exception("Error getting deauth status")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/deauth/stream')
def v2_deauth_stream():
"""
SSE stream for real-time deauth alerts.
Events:
- deauth_alert: A deauth attack was detected
- deauth_detector_started: Detector started
- deauth_detector_stopped: Detector stopped
- deauth_error: An error occurred
- keepalive: Periodic keepalive
"""
def generate():
last_keepalive = time.time()
keepalive_interval = SSE_KEEPALIVE_INTERVAL
while True:
try:
# Try to get from the dedicated deauth queue
msg = app_module.deauth_detector_queue.get(timeout=SSE_QUEUE_TIMEOUT)
last_keepalive = time.time()
yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval:
yield format_sse({'type': 'keepalive'})
last_keepalive = now
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
@wifi_bp.route('/v2/deauth/alerts')
def v2_deauth_alerts():
"""
Get historical deauth alerts.
Query params:
- limit: Maximum number of alerts to return (default 100)
"""
try:
limit = request.args.get('limit', 100, type=int)
limit = max(1, min(limit, 1000)) # Clamp between 1 and 1000
scanner = get_wifi_scanner()
alerts = scanner.get_deauth_alerts(limit=limit)
# Also include alerts from DataStore that might have been persisted
try:
stored_alerts = list(app_module.deauth_alerts.values())
# Merge and deduplicate by ID
alert_ids = {a.get('id') for a in alerts}
for alert in stored_alerts:
if alert.get('id') not in alert_ids:
alerts.append(alert)
# Sort by timestamp descending
alerts.sort(key=lambda a: a.get('timestamp', 0), reverse=True)
alerts = alerts[:limit]
except Exception:
pass
return jsonify({
'alerts': alerts,
'count': len(alerts),
})
except Exception as e:
logger.exception("Error getting deauth alerts")
return jsonify({'error': str(e)}), 500
@wifi_bp.route('/v2/deauth/clear', methods=['POST'])
def v2_deauth_clear():
"""Clear deauth alert history."""
try:
scanner = get_wifi_scanner()
scanner.clear_deauth_alerts()
# Clear the queue
while not app_module.deauth_detector_queue.empty():
try:
app_module.deauth_detector_queue.get_nowait()
except queue.Empty:
break
return jsonify({'status': 'cleared'})
except Exception as e:
logger.exception("Error clearing deauth alerts")
return jsonify({'error': str(e)}), 500

View File

@@ -978,6 +978,18 @@ header h1 {
color: var(--accent-cyan);
}
/* Donate button - warm amber accent */
.nav-tool-btn--donate {
text-decoration: none;
color: var(--accent-amber);
}
.nav-tool-btn--donate:hover {
color: var(--accent-orange);
border-color: var(--accent-amber);
background: rgba(212, 168, 83, 0.1);
}
/* Theme toggle icon states in nav bar */
.nav-tool-btn .icon-sun,
.nav-tool-btn .icon-moon {

View File

@@ -351,6 +351,34 @@
color: var(--accent-cyan, #00d4ff);
}
/* Donate Button */
.donate-btn {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 10px 20px;
background: linear-gradient(135deg, var(--accent-amber, #d4a853) 0%, var(--accent-orange, #f59e0b) 100%);
border: none;
border-radius: 6px;
color: #000;
font-size: 13px;
font-weight: 600;
text-decoration: none;
cursor: pointer;
transition: all 0.2s ease;
box-shadow: 0 2px 8px rgba(212, 168, 83, 0.3);
}
.donate-btn:hover {
transform: translateY(-1px);
box-shadow: 0 4px 12px rgba(212, 168, 83, 0.4);
filter: brightness(1.1);
}
.donate-btn:active {
transform: translateY(0);
}
/* Tile Provider Custom URL */
.custom-url-row {
margin-top: 8px;

View File

@@ -423,6 +423,7 @@
<a href="/controller/monitor" class="nav-tool-btn" title="Network Monitor - Multi-Agent View" style="text-decoration: none;"><span class="icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="10"/><line x1="2" y1="12" x2="22" y2="12"/><path d="M12 2a15.3 15.3 0 0 1 4 10 15.3 15.3 0 0 1-4 10 15.3 15.3 0 0 1-4-10 15.3 15.3 0 0 1 4-10z"/></svg></span></a>
<a href="/controller/manage" class="nav-tool-btn" title="Manage Remote Agents" style="text-decoration: none;"><span class="icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><rect x="2" y="2" width="20" height="8" rx="2" ry="2"/><rect x="2" y="14" width="20" height="8" rx="2" ry="2"/><line x1="6" y1="6" x2="6.01" y2="6"/><line x1="6" y1="18" x2="6.01" y2="18"/></svg></span></a>
<button class="nav-tool-btn" onclick="showSettings()" title="Settings"><span class="icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><circle cx="12" cy="12" r="3"/><path d="M19.4 15a1.65 1.65 0 0 0 .33 1.82l.06.06a2 2 0 0 1 0 2.83 2 2 0 0 1-2.83 0l-.06-.06a1.65 1.65 0 0 0-1.82-.33 1.65 1.65 0 0 0-1 1.51V21a2 2 0 0 1-2 2 2 2 0 0 1-2-2v-.09A1.65 1.65 0 0 0 9 19.4a1.65 1.65 0 0 0-1.82.33l-.06.06a2 2 0 0 1-2.83 0 2 2 0 0 1 0-2.83l.06-.06a1.65 1.65 0 0 0 .33-1.82 1.65 1.65 0 0 0-1.51-1H3a2 2 0 0 1-2-2 2 2 0 0 1 2-2h.09A1.65 1.65 0 0 0 4.6 9a1.65 1.65 0 0 0-.33-1.82l-.06-.06a2 2 0 0 1 0-2.83 2 2 0 0 1 2.83 0l.06.06a1.65 1.65 0 0 0 1.82.33H9a1.65 1.65 0 0 0 1-1.51V3a2 2 0 0 1 2-2 2 2 0 0 1 2 2v.09a1.65 1.65 0 0 0 1 1.51 1.65 1.65 0 0 0 1.82-.33l.06-.06a2 2 0 0 1 2.83 0 2 2 0 0 1 0 2.83l-.06.06a1.65 1.65 0 0 0-.33 1.82V9a1.65 1.65 0 0 0 1.51 1H21a2 2 0 0 1 2 2 2 2 0 0 1-2 2h-.09a1.65 1.65 0 0 0-1.51 1z"/></svg></span></button>
<a href="https://buymeacoffee.com/smittix" target="_blank" rel="noopener noreferrer" class="nav-tool-btn nav-tool-btn--donate" title="Support the Project"><span class="icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M17 8h1a4 4 0 1 1 0 8h-1"/><path d="M3 8h14v9a4 4 0 0 1-4 4H7a4 4 0 0 1-4-4Z"/><line x1="6" y1="2" x2="6" y2="4"/><line x1="10" y1="2" x2="10" y2="4"/><line x1="14" y1="2" x2="14" y2="4"/></svg></span></a>
<button class="nav-tool-btn" onclick="showHelp()" title="Help & Documentation">?</button>
<button class="nav-tool-btn" onclick="logout(event)" title="Logout">
<span class="power-icon icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M9 21H5a2 2 0 0 1-2-2V5a2 2 0 0 1 2-2h4"/><polyline points="16 17 21 12 16 7"/><line x1="21" y1="12" x2="9" y2="12"/></svg></span>

View File

@@ -295,6 +295,23 @@
</p>
</div>
</div>
<div class="settings-group">
<div class="settings-group-title">Support the Project</div>
<p style="color: var(--text-dim); margin-bottom: 15px; font-size: 12px;">
If you find iNTERCEPT useful, consider supporting its development.
</p>
<a href="https://buymeacoffee.com/smittix" target="_blank" rel="noopener noreferrer" class="donate-btn">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" style="width: 18px; height: 18px; vertical-align: -3px; margin-right: 8px;">
<path d="M17 8h1a4 4 0 1 1 0 8h-1"/>
<path d="M3 8h14v9a4 4 0 0 1-4 4H7a4 4 0 0 1-4-4Z"/>
<line x1="6" y1="2" x2="6" y2="4"/>
<line x1="10" y1="2" x2="10" y2="4"/>
<line x1="14" y1="2" x2="14" y2="4"/>
</svg>
Buy Me a Coffee
</a>
</div>
</div>
</div>
</div>

View File

@@ -0,0 +1,587 @@
"""
Unit tests for deauthentication attack detector.
"""
import os
import sys
import time
from unittest.mock import MagicMock, patch
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from utils.wifi.deauth_detector import (
DeauthDetector,
DeauthPacketInfo,
DeauthTracker,
DeauthAlert,
DEAUTH_REASON_CODES,
)
from utils.constants import (
DEAUTH_DETECTION_WINDOW,
DEAUTH_ALERT_THRESHOLD,
DEAUTH_CRITICAL_THRESHOLD,
)
class TestDeauthPacketInfo:
"""Tests for DeauthPacketInfo dataclass."""
def test_creation(self):
"""Test basic creation of packet info."""
pkt = DeauthPacketInfo(
timestamp=1234567890.0,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
signal_dbm=-45,
)
assert pkt.frame_type == 'deauth'
assert pkt.src_mac == 'AA:BB:CC:DD:EE:FF'
assert pkt.reason_code == 7
assert pkt.signal_dbm == -45
class TestDeauthTracker:
"""Tests for DeauthTracker."""
def test_add_packet(self):
"""Test adding packets to tracker."""
tracker = DeauthTracker()
pkt1 = DeauthPacketInfo(
timestamp=100.0,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
)
tracker.add_packet(pkt1)
assert len(tracker.packets) == 1
assert tracker.first_seen == 100.0
assert tracker.last_seen == 100.0
def test_multiple_packets(self):
"""Test adding multiple packets."""
tracker = DeauthTracker()
for i in range(5):
pkt = DeauthPacketInfo(
timestamp=100.0 + i,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
)
tracker.add_packet(pkt)
assert len(tracker.packets) == 5
assert tracker.first_seen == 100.0
assert tracker.last_seen == 104.0
def test_get_packets_in_window(self):
"""Test filtering packets by time window."""
tracker = DeauthTracker()
now = time.time()
# Add old packet
tracker.add_packet(DeauthPacketInfo(
timestamp=now - 10,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
))
# Add recent packets
for i in range(3):
tracker.add_packet(DeauthPacketInfo(
timestamp=now - i,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
))
# 5-second window should only include the 3 recent packets
in_window = tracker.get_packets_in_window(5.0)
assert len(in_window) == 3
def test_cleanup_old_packets(self):
"""Test removing old packets."""
tracker = DeauthTracker()
now = time.time()
# Add old packet
tracker.add_packet(DeauthPacketInfo(
timestamp=now - 20,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
))
# Add recent packet
tracker.add_packet(DeauthPacketInfo(
timestamp=now,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
))
tracker.alert_sent = True
# Cleanup with 10-second window
tracker.cleanup_old_packets(10.0)
assert len(tracker.packets) == 1
assert tracker.packets[0].timestamp == now
def test_cleanup_resets_alert_sent(self):
"""Test that cleanup resets alert_sent when all packets removed."""
tracker = DeauthTracker()
now = time.time()
tracker.add_packet(DeauthPacketInfo(
timestamp=now - 100, # Very old
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='AA:BB:CC:DD:EE:FF',
reason_code=7,
))
tracker.alert_sent = True
# Cleanup should remove all packets
tracker.cleanup_old_packets(10.0)
assert len(tracker.packets) == 0
assert tracker.alert_sent is False
class TestDeauthAlert:
"""Tests for DeauthAlert."""
def test_to_dict(self):
"""Test conversion to dictionary."""
alert = DeauthAlert(
id='deauth-123-1',
timestamp=1234567890.0,
severity='high',
attacker_mac='AA:BB:CC:DD:EE:FF',
attacker_vendor='Unknown',
attacker_signal_dbm=-45,
is_spoofed_ap=True,
target_mac='11:22:33:44:55:66',
target_vendor='Apple',
target_type='client',
target_known_from_scan=True,
ap_bssid='AA:BB:CC:DD:EE:FF',
ap_essid='TestNetwork',
ap_channel=6,
frame_type='deauth',
reason_code=7,
reason_text='Class 3 frame received from nonassociated STA',
packet_count=50,
window_seconds=5.0,
packets_per_second=10.0,
attack_type='targeted',
description='Targeted deauth flood against known client',
)
d = alert.to_dict()
assert d['id'] == 'deauth-123-1'
assert d['type'] == 'deauth_alert'
assert d['severity'] == 'high'
assert d['attacker']['mac'] == 'AA:BB:CC:DD:EE:FF'
assert d['attacker']['is_spoofed_ap'] is True
assert d['target']['type'] == 'client'
assert d['access_point']['essid'] == 'TestNetwork'
assert d['attack_info']['packet_count'] == 50
assert d['analysis']['attack_type'] == 'targeted'
class TestDeauthDetector:
"""Tests for DeauthDetector."""
def test_init(self):
"""Test detector initialization."""
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
assert detector.interface == 'wlan0mon'
assert detector.event_callback == callback
assert not detector.is_running
def test_stats(self):
"""Test stats property."""
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
stats = detector.stats
assert stats['is_running'] is False
assert stats['interface'] == 'wlan0mon'
assert stats['packets_captured'] == 0
assert stats['alerts_generated'] == 0
def test_get_alerts_empty(self):
"""Test getting alerts when none exist."""
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
alerts = detector.get_alerts()
assert alerts == []
def test_clear_alerts(self):
"""Test clearing alerts."""
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
# Add a mock alert
detector._alerts.append(MagicMock())
detector._trackers[('A', 'B', 'C')] = DeauthTracker()
detector._alert_counter = 5
detector.clear_alerts()
assert len(detector._alerts) == 0
assert len(detector._trackers) == 0
assert detector._alert_counter == 0
@patch('utils.wifi.deauth_detector.time.time')
def test_generate_alert_severity_low(self, mock_time):
"""Test alert generation with low severity."""
mock_time.return_value = 1000.0
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
# Create packets just at threshold
packets = []
for i in range(DEAUTH_ALERT_THRESHOLD):
packets.append(DeauthPacketInfo(
timestamp=1000.0 - (DEAUTH_ALERT_THRESHOLD - 1 - i) * 0.1,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='99:88:77:66:55:44',
reason_code=7,
signal_dbm=-50,
))
alert = detector._generate_alert(
tracker_key=('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44'),
packets=packets,
packet_count=DEAUTH_ALERT_THRESHOLD,
)
assert alert.severity == 'low'
assert alert.packet_count == DEAUTH_ALERT_THRESHOLD
@patch('utils.wifi.deauth_detector.time.time')
def test_generate_alert_severity_high(self, mock_time):
"""Test alert generation with high severity."""
mock_time.return_value = 1000.0
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
# Create packets above critical threshold
packets = []
for i in range(DEAUTH_CRITICAL_THRESHOLD):
packets.append(DeauthPacketInfo(
timestamp=1000.0 - (DEAUTH_CRITICAL_THRESHOLD - 1 - i) * 0.1,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='99:88:77:66:55:44',
reason_code=7,
))
alert = detector._generate_alert(
tracker_key=('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44'),
packets=packets,
packet_count=DEAUTH_CRITICAL_THRESHOLD,
)
assert alert.severity == 'high'
@patch('utils.wifi.deauth_detector.time.time')
def test_generate_alert_broadcast_attack(self, mock_time):
"""Test alert classification for broadcast attack."""
mock_time.return_value = 1000.0
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
packets = [DeauthPacketInfo(
timestamp=999.9,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='FF:FF:FF:FF:FF:FF', # Broadcast
bssid='99:88:77:66:55:44',
reason_code=7,
)]
alert = detector._generate_alert(
tracker_key=('AA:BB:CC:DD:EE:FF', 'FF:FF:FF:FF:FF:FF', '99:88:77:66:55:44'),
packets=packets,
packet_count=10,
)
assert alert.attack_type == 'broadcast'
assert alert.target_type == 'broadcast'
assert 'all clients' in alert.description.lower()
def test_lookup_ap_no_callback(self):
"""Test AP lookup when no callback is provided."""
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
get_networks=None,
)
result = detector._lookup_ap('AA:BB:CC:DD:EE:FF')
assert result['bssid'] == 'AA:BB:CC:DD:EE:FF'
assert result['essid'] is None
assert result['channel'] is None
def test_lookup_ap_with_callback(self):
"""Test AP lookup with callback."""
callback = MagicMock()
get_networks = MagicMock(return_value={
'AA:BB:CC:DD:EE:FF': {'essid': 'TestNet', 'channel': 6}
})
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
get_networks=get_networks,
)
result = detector._lookup_ap('AA:BB:CC:DD:EE:FF')
assert result['bssid'] == 'AA:BB:CC:DD:EE:FF'
assert result['essid'] == 'TestNet'
assert result['channel'] == 6
def test_check_spoofed_source(self):
"""Test detection of spoofed AP source."""
callback = MagicMock()
get_networks = MagicMock(return_value={
'AA:BB:CC:DD:EE:FF': {'essid': 'TestNet'}
})
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
get_networks=get_networks,
)
# Source matches known AP - spoofed
assert detector._check_spoofed_source('AA:BB:CC:DD:EE:FF') is True
# Source does not match any AP - not spoofed
assert detector._check_spoofed_source('11:22:33:44:55:66') is False
def test_cleanup_old_trackers(self):
"""Test cleanup of old trackers."""
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
now = time.time()
# Add an old tracker
old_tracker = DeauthTracker()
old_tracker.add_packet(DeauthPacketInfo(
timestamp=now - 100, # Very old
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='99:88:77:66:55:44',
reason_code=7,
))
detector._trackers[('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44')] = old_tracker
# Add a recent tracker
recent_tracker = DeauthTracker()
recent_tracker.add_packet(DeauthPacketInfo(
timestamp=now,
frame_type='deauth',
src_mac='BB:CC:DD:EE:FF:AA',
dst_mac='22:33:44:55:66:77',
bssid='88:77:66:55:44:33',
reason_code=7,
))
detector._trackers[('BB:CC:DD:EE:FF:AA', '22:33:44:55:66:77', '88:77:66:55:44:33')] = recent_tracker
detector._cleanup_old_trackers()
# Old tracker should be removed
assert ('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44') not in detector._trackers
# Recent tracker should remain
assert ('BB:CC:DD:EE:FF:AA', '22:33:44:55:66:77', '88:77:66:55:44:33') in detector._trackers
class TestReasonCodes:
"""Tests for reason code dictionary."""
def test_common_reason_codes(self):
"""Test that common reason codes are defined."""
assert 1 in DEAUTH_REASON_CODES # Unspecified
assert 7 in DEAUTH_REASON_CODES # Class 3 frame
assert 14 in DEAUTH_REASON_CODES # MIC failure
def test_reason_code_descriptions(self):
"""Test reason code descriptions are strings."""
for code, desc in DEAUTH_REASON_CODES.items():
assert isinstance(code, int)
assert isinstance(desc, str)
assert len(desc) > 0
class TestDeauthDetectorIntegration:
"""Integration tests for DeauthDetector with mocked scapy."""
@patch('utils.wifi.deauth_detector.time.time')
def test_process_deauth_packet_generates_alert(self, mock_time):
"""Test that processing packets generates alert when threshold exceeded."""
mock_time.return_value = 1000.0
callback = MagicMock()
detector = DeauthDetector(
interface='wlan0mon',
event_callback=callback,
)
# Create a mock scapy packet
mock_pkt = MagicMock()
# Mock Dot11Deauth layer
mock_deauth = MagicMock()
mock_deauth.reason = 7
# Mock Dot11 layer
mock_dot11 = MagicMock()
mock_dot11.addr1 = '11:22:33:44:55:66' # dst
mock_dot11.addr2 = 'AA:BB:CC:DD:EE:FF' # src
mock_dot11.addr3 = '99:88:77:66:55:44' # bssid
# Mock RadioTap layer
mock_radiotap = MagicMock()
mock_radiotap.dBm_AntSignal = -50
# Set up haslayer behavior
def haslayer_side_effect(layer):
if 'Dot11Deauth' in str(layer):
return True
if 'Dot11Disas' in str(layer):
return False
if 'RadioTap' in str(layer):
return True
return False
mock_pkt.haslayer = haslayer_side_effect
# Set up __getitem__ behavior
def getitem_side_effect(layer):
if 'Dot11Deauth' in str(layer):
return mock_deauth
if 'Dot11' in str(layer) and 'Deauth' not in str(layer):
return mock_dot11
if 'RadioTap' in str(layer):
return mock_radiotap
return MagicMock()
mock_pkt.__getitem__ = getitem_side_effect
# Patch the scapy imports inside _process_deauth_packet
with patch('utils.wifi.deauth_detector.DeauthDetector._process_deauth_packet.__globals__', {
'Dot11': MagicMock,
'Dot11Deauth': MagicMock,
'Dot11Disas': MagicMock,
'RadioTap': MagicMock,
}):
# Process enough packets to trigger alert
for i in range(DEAUTH_ALERT_THRESHOLD + 5):
mock_time.return_value = 1000.0 + i * 0.1
# Manually simulate what _process_deauth_packet does
pkt_info = DeauthPacketInfo(
timestamp=mock_time.return_value,
frame_type='deauth',
src_mac='AA:BB:CC:DD:EE:FF',
dst_mac='11:22:33:44:55:66',
bssid='99:88:77:66:55:44',
reason_code=7,
signal_dbm=-50,
)
detector._packets_captured += 1
tracker_key = ('AA:BB:CC:DD:EE:FF', '11:22:33:44:55:66', '99:88:77:66:55:44')
tracker = detector._trackers[tracker_key]
tracker.add_packet(pkt_info)
packets_in_window = tracker.get_packets_in_window(DEAUTH_DETECTION_WINDOW)
packet_count = len(packets_in_window)
if packet_count >= DEAUTH_ALERT_THRESHOLD and not tracker.alert_sent:
alert = detector._generate_alert(
tracker_key=tracker_key,
packets=packets_in_window,
packet_count=packet_count,
)
detector._alerts.append(alert)
detector._alerts_generated += 1
tracker.alert_sent = True
detector.event_callback(alert.to_dict())
# Verify alert was generated
assert detector._alerts_generated == 1
assert len(detector._alerts) == 1
assert callback.called
# Verify callback was called with alert data
call_args = callback.call_args[0][0]
assert call_args['type'] == 'deauth_alert'
assert call_args['attacker']['mac'] == 'AA:BB:CC:DD:EE:FF'
assert call_args['target']['mac'] == '11:22:33:44:55:66'

View File

@@ -254,3 +254,23 @@ MAX_DSC_MESSAGE_AGE_SECONDS = 3600 # 1 hour
# DSC process termination timeout
DSC_TERMINATE_TIMEOUT = 3
# =============================================================================
# DEAUTH ATTACK DETECTION
# =============================================================================
# Time window for grouping deauth packets (seconds)
DEAUTH_DETECTION_WINDOW = 5
# Number of deauth packets in window to trigger alert
DEAUTH_ALERT_THRESHOLD = 10
# Number of deauth packets in window for critical severity
DEAUTH_CRITICAL_THRESHOLD = 50
# Maximum age for deauth alerts in DataStore (seconds)
MAX_DEAUTH_ALERTS_AGE_SECONDS = 300 # 5 minutes
# Deauth detector sniff timeout (seconds)
DEAUTH_SNIFF_TIMEOUT = 0.5

View File

@@ -0,0 +1,616 @@
"""
Deauthentication attack detector using scapy.
Monitors a WiFi interface in monitor mode for deauthentication and disassociation
frames, detecting potential deauth flood attacks.
"""
from __future__ import annotations
import logging
import threading
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional, Any
from utils.constants import (
DEAUTH_DETECTION_WINDOW,
DEAUTH_ALERT_THRESHOLD,
DEAUTH_CRITICAL_THRESHOLD,
DEAUTH_SNIFF_TIMEOUT,
)
logger = logging.getLogger(__name__)
# Deauth reason code descriptions
DEAUTH_REASON_CODES = {
0: "Reserved",
1: "Unspecified reason",
2: "Previous authentication no longer valid",
3: "Station is leaving (or has left) IBSS or ESS",
4: "Disassociated due to inactivity",
5: "Disassociated because AP is unable to handle all currently associated STAs",
6: "Class 2 frame received from nonauthenticated STA",
7: "Class 3 frame received from nonassociated STA",
8: "Disassociated because sending STA is leaving (or has left) BSS",
9: "STA requesting (re)association is not authenticated with responding STA",
10: "Disassociated because the information in the Power Capability element is unacceptable",
11: "Disassociated because the information in the Supported Channels element is unacceptable",
12: "Disassociated due to BSS Transition Management",
13: "Invalid information element",
14: "MIC failure",
15: "4-Way Handshake timeout",
16: "Group Key Handshake timeout",
17: "Information element in 4-Way Handshake different from (Re)Association Request/Probe Response/Beacon frame",
18: "Invalid group cipher",
19: "Invalid pairwise cipher",
20: "Invalid AKMP",
21: "Unsupported RSNE version",
22: "Invalid RSNE capabilities",
23: "IEEE 802.1X authentication failed",
24: "Cipher suite rejected because of security policy",
}
@dataclass
class DeauthPacketInfo:
"""Information about a captured deauth/disassoc packet."""
timestamp: float
frame_type: str # 'deauth' or 'disassoc'
src_mac: str
dst_mac: str
bssid: str
reason_code: int
signal_dbm: Optional[int] = None
@dataclass
class DeauthTracker:
"""Tracks deauth packets for a specific source/dest/bssid combination."""
packets: list[DeauthPacketInfo] = field(default_factory=list)
first_seen: float = 0.0
last_seen: float = 0.0
alert_sent: bool = False
def add_packet(self, pkt: DeauthPacketInfo):
self.packets.append(pkt)
now = pkt.timestamp
if self.first_seen == 0.0:
self.first_seen = now
self.last_seen = now
def get_packets_in_window(self, window_seconds: float) -> list[DeauthPacketInfo]:
"""Get packets within the time window."""
cutoff = time.time() - window_seconds
return [p for p in self.packets if p.timestamp >= cutoff]
def cleanup_old_packets(self, window_seconds: float):
"""Remove packets older than the window."""
cutoff = time.time() - window_seconds
self.packets = [p for p in self.packets if p.timestamp >= cutoff]
if self.packets:
self.first_seen = self.packets[0].timestamp
else:
self.first_seen = 0.0
self.alert_sent = False
@dataclass
class DeauthAlert:
"""A deauthentication attack alert."""
id: str
timestamp: float
severity: str # 'low', 'medium', 'high'
# Attacker info
attacker_mac: str
attacker_vendor: Optional[str]
attacker_signal_dbm: Optional[int]
is_spoofed_ap: bool
# Target info
target_mac: str
target_vendor: Optional[str]
target_type: str # 'client', 'broadcast', 'ap'
target_known_from_scan: bool
# Access point info
ap_bssid: str
ap_essid: Optional[str]
ap_channel: Optional[int]
# Attack info
frame_type: str
reason_code: int
reason_text: str
packet_count: int
window_seconds: float
packets_per_second: float
# Analysis
attack_type: str # 'targeted', 'broadcast', 'ap_flood'
description: str
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'id': self.id,
'type': 'deauth_alert',
'timestamp': self.timestamp,
'severity': self.severity,
'attacker': {
'mac': self.attacker_mac,
'vendor': self.attacker_vendor,
'signal_dbm': self.attacker_signal_dbm,
'is_spoofed_ap': self.is_spoofed_ap,
},
'target': {
'mac': self.target_mac,
'vendor': self.target_vendor,
'type': self.target_type,
'known_from_scan': self.target_known_from_scan,
},
'access_point': {
'bssid': self.ap_bssid,
'essid': self.ap_essid,
'channel': self.ap_channel,
},
'attack_info': {
'frame_type': self.frame_type,
'reason_code': self.reason_code,
'reason_text': self.reason_text,
'packet_count': self.packet_count,
'window_seconds': self.window_seconds,
'packets_per_second': self.packets_per_second,
},
'analysis': {
'attack_type': self.attack_type,
'description': self.description,
},
}
class DeauthDetector:
"""
Detects deauthentication attacks using scapy.
Monitors a WiFi interface in monitor mode for deauth/disassoc frames
and emits alerts when attack thresholds are exceeded.
"""
def __init__(
self,
interface: str,
event_callback: Callable[[dict], None],
get_networks: Optional[Callable[[], dict[str, Any]]] = None,
get_clients: Optional[Callable[[], dict[str, Any]]] = None,
):
"""
Initialize the deauth detector.
Args:
interface: Monitor mode interface to sniff on
event_callback: Callback function to receive alert events
get_networks: Optional function to get current WiFi networks (bssid -> network_info)
get_clients: Optional function to get current WiFi clients (mac -> client_info)
"""
self.interface = interface
self.event_callback = event_callback
self.get_networks = get_networks
self.get_clients = get_clients
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# Track deauth packets by (src, dst, bssid) tuple
self._trackers: dict[tuple[str, str, str], DeauthTracker] = defaultdict(DeauthTracker)
# Alert history
self._alerts: list[DeauthAlert] = []
self._alert_counter = 0
# Stats
self._packets_captured = 0
self._alerts_generated = 0
self._started_at: Optional[float] = None
@property
def is_running(self) -> bool:
"""Check if detector is running."""
return self._thread is not None and self._thread.is_alive()
@property
def stats(self) -> dict:
"""Get detector statistics."""
return {
'is_running': self.is_running,
'interface': self.interface,
'started_at': self._started_at,
'packets_captured': self._packets_captured,
'alerts_generated': self._alerts_generated,
'active_trackers': len(self._trackers),
}
def start(self) -> bool:
"""
Start detection in background thread.
Returns:
True if started successfully.
"""
if self.is_running:
logger.warning("Deauth detector already running")
return True
self._stop_event.clear()
self._started_at = time.time()
self._thread = threading.Thread(
target=self._sniff_loop,
name="DeauthDetector",
daemon=True,
)
self._thread.start()
logger.info(f"Deauth detector started on {self.interface}")
return True
def stop(self) -> bool:
"""
Stop detection.
Returns:
True if stopped successfully.
"""
if not self.is_running:
return True
logger.info("Stopping deauth detector...")
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
if self._thread.is_alive():
logger.warning("Deauth detector thread did not stop cleanly")
self._thread = None
self._started_at = None
logger.info("Deauth detector stopped")
return True
def get_alerts(self, limit: int = 100) -> list[dict]:
"""Get recent alerts."""
with self._lock:
return [a.to_dict() for a in self._alerts[-limit:]]
def clear_alerts(self):
"""Clear alert history."""
with self._lock:
self._alerts.clear()
self._trackers.clear()
self._alert_counter = 0
def _sniff_loop(self):
"""Main sniffing loop using scapy."""
try:
from scapy.all import sniff, Dot11, Dot11Deauth, Dot11Disas
except ImportError:
logger.error("scapy not installed. Install with: pip install scapy")
self.event_callback({
'type': 'deauth_error',
'error': 'scapy not installed',
})
return
logger.info(f"Starting deauth sniff on {self.interface}")
def packet_handler(pkt):
"""Handle each captured packet."""
if self._stop_event.is_set():
return
# Check for deauth or disassoc frames
if pkt.haslayer(Dot11Deauth) or pkt.haslayer(Dot11Disas):
self._process_deauth_packet(pkt)
try:
# Use stop_filter to allow clean shutdown
sniff(
iface=self.interface,
prn=packet_handler,
store=False,
stop_filter=lambda _: self._stop_event.is_set(),
timeout=DEAUTH_SNIFF_TIMEOUT,
)
# Continue sniffing until stop is requested
while not self._stop_event.is_set():
sniff(
iface=self.interface,
prn=packet_handler,
store=False,
stop_filter=lambda _: self._stop_event.is_set(),
timeout=DEAUTH_SNIFF_TIMEOUT,
)
# Periodic cleanup
self._cleanup_old_trackers()
except OSError as e:
if "No such device" in str(e):
logger.error(f"Interface {self.interface} not found")
self.event_callback({
'type': 'deauth_error',
'error': f'Interface {self.interface} not found',
})
else:
logger.exception(f"Sniff error: {e}")
self.event_callback({
'type': 'deauth_error',
'error': str(e),
})
except Exception as e:
logger.exception(f"Sniff error: {e}")
self.event_callback({
'type': 'deauth_error',
'error': str(e),
})
def _process_deauth_packet(self, pkt):
"""Process a deauth/disassoc packet and emit alert if threshold exceeded."""
try:
from scapy.all import Dot11, Dot11Deauth, Dot11Disas, RadioTap
except ImportError:
return
# Determine frame type
if pkt.haslayer(Dot11Deauth):
frame_type = 'deauth'
reason_code = pkt[Dot11Deauth].reason
elif pkt.haslayer(Dot11Disas):
frame_type = 'disassoc'
reason_code = pkt[Dot11Disas].reason
else:
return
# Extract addresses from Dot11 layer
dot11 = pkt[Dot11]
dst_mac = (dot11.addr1 or '').upper()
src_mac = (dot11.addr2 or '').upper()
bssid = (dot11.addr3 or '').upper()
# Skip if addresses are missing
if not src_mac or not dst_mac:
return
# Extract signal strength from RadioTap if available
signal_dbm = None
if pkt.haslayer(RadioTap):
try:
signal_dbm = pkt[RadioTap].dBm_AntSignal
except AttributeError:
pass
# Create packet info
pkt_info = DeauthPacketInfo(
timestamp=time.time(),
frame_type=frame_type,
src_mac=src_mac,
dst_mac=dst_mac,
bssid=bssid,
reason_code=reason_code,
signal_dbm=signal_dbm,
)
self._packets_captured += 1
# Track packet
tracker_key = (src_mac, dst_mac, bssid)
with self._lock:
tracker = self._trackers[tracker_key]
tracker.add_packet(pkt_info)
# Check if threshold exceeded
packets_in_window = tracker.get_packets_in_window(DEAUTH_DETECTION_WINDOW)
packet_count = len(packets_in_window)
if packet_count >= DEAUTH_ALERT_THRESHOLD and not tracker.alert_sent:
# Generate alert
alert = self._generate_alert(
tracker_key=tracker_key,
packets=packets_in_window,
packet_count=packet_count,
)
self._alerts.append(alert)
self._alerts_generated += 1
tracker.alert_sent = True
# Emit event
self.event_callback(alert.to_dict())
logger.warning(
f"Deauth attack detected: {src_mac} -> {dst_mac} "
f"({packet_count} packets in {DEAUTH_DETECTION_WINDOW}s)"
)
def _generate_alert(
self,
tracker_key: tuple[str, str, str],
packets: list[DeauthPacketInfo],
packet_count: int,
) -> DeauthAlert:
"""Generate an alert from tracked packets."""
src_mac, dst_mac, bssid = tracker_key
# Get latest packet for details
latest_pkt = packets[-1] if packets else None
# Determine severity
if packet_count >= DEAUTH_CRITICAL_THRESHOLD:
severity = 'high'
elif packet_count >= DEAUTH_ALERT_THRESHOLD * 2.5:
severity = 'medium'
else:
severity = 'low'
# Lookup AP info
ap_info = self._lookup_ap(bssid)
# Lookup target info
target_info = self._lookup_device(dst_mac)
# Determine target type
if dst_mac == 'FF:FF:FF:FF:FF:FF':
target_type = 'broadcast'
elif dst_mac in self._get_known_aps():
target_type = 'ap'
else:
target_type = 'client'
# Check if source is spoofed (matches known AP)
is_spoofed = self._check_spoofed_source(src_mac)
# Get attacker vendor
attacker_vendor = self._get_vendor(src_mac)
# Calculate packets per second
if packets:
time_span = packets[-1].timestamp - packets[0].timestamp
pps = packet_count / time_span if time_span > 0 else float(packet_count)
else:
pps = 0.0
# Determine attack type and description
if dst_mac == 'FF:FF:FF:FF:FF:FF':
attack_type = 'broadcast'
description = "Broadcast deauth flood targeting all clients on the network"
elif target_type == 'ap':
attack_type = 'ap_flood'
description = "Deauth flood targeting access point"
else:
attack_type = 'targeted'
description = f"Targeted deauth flood against {'known' if target_info.get('known_from_scan') else 'unknown'} client"
# Get reason code info
reason_code = latest_pkt.reason_code if latest_pkt else 0
reason_text = DEAUTH_REASON_CODES.get(reason_code, f"Unknown ({reason_code})")
# Get signal
signal_dbm = None
for pkt in reversed(packets):
if pkt.signal_dbm is not None:
signal_dbm = pkt.signal_dbm
break
# Generate unique ID
self._alert_counter += 1
alert_id = f"deauth-{int(time.time())}-{self._alert_counter}"
return DeauthAlert(
id=alert_id,
timestamp=time.time(),
severity=severity,
attacker_mac=src_mac,
attacker_vendor=attacker_vendor,
attacker_signal_dbm=signal_dbm,
is_spoofed_ap=is_spoofed,
target_mac=dst_mac,
target_vendor=target_info.get('vendor'),
target_type=target_type,
target_known_from_scan=target_info.get('known_from_scan', False),
ap_bssid=bssid,
ap_essid=ap_info.get('essid'),
ap_channel=ap_info.get('channel'),
frame_type=latest_pkt.frame_type if latest_pkt else 'deauth',
reason_code=reason_code,
reason_text=reason_text,
packet_count=packet_count,
window_seconds=DEAUTH_DETECTION_WINDOW,
packets_per_second=round(pps, 1),
attack_type=attack_type,
description=description,
)
def _lookup_ap(self, bssid: str) -> dict:
"""Get AP info from current scan data."""
if not self.get_networks:
return {'bssid': bssid, 'essid': None, 'channel': None}
try:
networks = self.get_networks()
ap = networks.get(bssid.upper())
if ap:
return {
'bssid': bssid,
'essid': ap.get('essid') or ap.get('ssid'),
'channel': ap.get('channel'),
}
except Exception as e:
logger.debug(f"Error looking up AP {bssid}: {e}")
return {'bssid': bssid, 'essid': None, 'channel': None}
def _lookup_device(self, mac: str) -> dict:
"""Get device info and vendor from MAC."""
vendor = self._get_vendor(mac)
known_from_scan = False
if self.get_clients:
try:
clients = self.get_clients()
if mac.upper() in clients:
known_from_scan = True
except Exception:
pass
return {
'mac': mac,
'vendor': vendor,
'known_from_scan': known_from_scan,
}
def _get_known_aps(self) -> set[str]:
"""Get set of known AP BSSIDs."""
if not self.get_networks:
return set()
try:
networks = self.get_networks()
return {bssid.upper() for bssid in networks.keys()}
except Exception:
return set()
def _check_spoofed_source(self, src_mac: str) -> bool:
"""Check if source MAC matches a known AP (spoofing indicator)."""
return src_mac.upper() in self._get_known_aps()
def _get_vendor(self, mac: str) -> Optional[str]:
"""Get vendor from MAC OUI."""
try:
from data.oui import get_manufacturer
vendor = get_manufacturer(mac)
return vendor if vendor != 'Unknown' else None
except Exception:
pass
# Fallback to wifi constants
try:
from utils.wifi.constants import get_vendor_from_mac
return get_vendor_from_mac(mac)
except Exception:
return None
def _cleanup_old_trackers(self):
"""Remove old packets and empty trackers."""
with self._lock:
keys_to_remove = []
for key, tracker in self._trackers.items():
tracker.cleanup_old_packets(DEAUTH_DETECTION_WINDOW * 2)
if not tracker.packets:
keys_to_remove.append(key)
for key in keys_to_remove:
del self._trackers[key]

View File

@@ -19,7 +19,10 @@ import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Callable, Generator, Optional
from typing import Callable, Generator, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .deauth_detector import DeauthDetector
from .constants import (
DEFAULT_QUICK_SCAN_TIMEOUT,
@@ -87,6 +90,9 @@ class UnifiedWiFiScanner:
self._deep_scan_thread: Optional[threading.Thread] = None
self._deep_scan_stop_event = threading.Event()
# Deauth detector
self._deauth_detector: Optional['DeauthDetector'] = None
# Event queue for SSE streaming
self._event_queue: queue.Queue = queue.Queue(maxsize=1000)
@@ -623,6 +629,9 @@ class UnifiedWiFiScanner:
'interface': iface,
})
# Auto-start deauth detector
self._start_deauth_detector(iface)
return True
def stop_deep_scan(self) -> bool:
@@ -636,6 +645,9 @@ class UnifiedWiFiScanner:
if not self._status.is_scanning:
return True
# Stop deauth detector first
self._stop_deauth_detector()
self._deep_scan_stop_event.set()
if self._deep_scan_process:
@@ -1148,6 +1160,107 @@ class UnifiedWiFiScanner:
with self._lock:
return [ap.to_legacy_dict() for ap in self._access_points.values()]
# =========================================================================
# Deauth Detection Integration
# =========================================================================
def _start_deauth_detector(self, interface: str):
"""Start deauth detector on the given interface."""
try:
from .deauth_detector import DeauthDetector
except ImportError as e:
logger.warning(f"Could not import DeauthDetector (scapy not installed?): {e}")
return
if self._deauth_detector and self._deauth_detector.is_running:
logger.debug("Deauth detector already running")
return
def event_callback(event: dict):
"""Handle deauth events and forward to queue."""
self._queue_event(event)
# Also store in app-level DataStore if available
try:
import app as app_module
if hasattr(app_module, 'deauth_alerts') and event.get('type') == 'deauth_alert':
alert_id = event.get('id', str(time.time()))
app_module.deauth_alerts[alert_id] = event
if hasattr(app_module, 'deauth_detector_queue'):
try:
app_module.deauth_detector_queue.put_nowait(event)
except queue.Full:
pass
except Exception as e:
logger.debug(f"Error storing deauth alert: {e}")
def get_networks() -> dict:
"""Get current networks for cross-reference."""
with self._lock:
return {bssid: ap.to_summary_dict() for bssid, ap in self._access_points.items()}
def get_clients() -> dict:
"""Get current clients for cross-reference."""
with self._lock:
return {mac: client.to_dict() for mac, client in self._clients.items()}
try:
self._deauth_detector = DeauthDetector(
interface=interface,
event_callback=event_callback,
get_networks=get_networks,
get_clients=get_clients,
)
self._deauth_detector.start()
logger.info(f"Deauth detector started on {interface}")
self._queue_event({
'type': 'deauth_detector_started',
'interface': interface,
})
except Exception as e:
logger.error(f"Failed to start deauth detector: {e}")
self._queue_event({
'type': 'deauth_error',
'error': f"Failed to start deauth detector: {e}",
})
def _stop_deauth_detector(self):
"""Stop the deauth detector."""
if self._deauth_detector:
try:
self._deauth_detector.stop()
logger.info("Deauth detector stopped")
self._queue_event({
'type': 'deauth_detector_stopped',
})
except Exception as e:
logger.error(f"Error stopping deauth detector: {e}")
finally:
self._deauth_detector = None
@property
def deauth_detector(self) -> Optional['DeauthDetector']:
"""Get the deauth detector instance."""
return self._deauth_detector
def get_deauth_alerts(self, limit: int = 100) -> list[dict]:
"""Get recent deauth alerts."""
if self._deauth_detector:
return self._deauth_detector.get_alerts(limit)
return []
def clear_deauth_alerts(self):
"""Clear deauth alert history."""
if self._deauth_detector:
self._deauth_detector.clear_alerts()
# Also clear from app-level store
try:
import app as app_module
if hasattr(app_module, 'deauth_alerts'):
app_module.deauth_alerts.clear()
except Exception:
pass
# =============================================================================
# Module-level functions