mirror of
https://github.com/smittix/intercept.git
synced 2026-04-25 07:10:00 -07:00
Backend: - New utils/wifi/ package with models, scanner, parsers, channel analyzer - Quick Scan mode using system tools (nmcli, iw, iwlist, airport) - Deep Scan mode using airodump-ng with monitor mode - Hidden SSID correlation engine - Channel utilization analysis with recommendations - v2 API endpoints at /wifi/v2/* with SSE streaming - TSCM integration updated to use new scanner (backwards compatible) Frontend: - WiFi mode controller (wifi.js) with dual-mode support - Channel utilization chart component (channel-chart.js) - Updated wifi.html template with scan mode tabs and export Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
328 lines
10 KiB
Python
328 lines
10 KiB
Python
"""
|
|
Hidden SSID correlation engine.
|
|
|
|
Correlates probe requests from clients with hidden access points to reveal
|
|
the actual SSID of hidden networks.
|
|
|
|
Strategy:
|
|
1. Track probe requests with source MACs and probed SSIDs
|
|
2. Track hidden networks (empty ESSID) with their BSSIDs
|
|
3. When a client probes for an SSID and then associates with a hidden AP
|
|
within a time window, correlate the SSID to the hidden AP
|
|
4. Also correlate when the same client is seen both probing for an SSID
|
|
and sending data to a hidden AP
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from typing import Callable, Optional
|
|
|
|
from .constants import (
|
|
HIDDEN_CORRELATION_WINDOW_SECONDS,
|
|
HIDDEN_MIN_CORRELATION_CONFIDENCE,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global correlator instance
|
|
_correlator_instance: Optional['HiddenSSIDCorrelator'] = None
|
|
_correlator_lock = threading.Lock()
|
|
|
|
|
|
@dataclass
|
|
class ProbeRecord:
|
|
"""Record of a probe request."""
|
|
timestamp: datetime
|
|
client_mac: str
|
|
probed_ssid: str
|
|
|
|
|
|
@dataclass
|
|
class AssociationRecord:
|
|
"""Record of a client association."""
|
|
timestamp: datetime
|
|
client_mac: str
|
|
bssid: str
|
|
|
|
|
|
@dataclass
|
|
class CorrelationResult:
|
|
"""Result of an SSID correlation."""
|
|
bssid: str
|
|
revealed_ssid: str
|
|
client_mac: str
|
|
confidence: float
|
|
correlation_time: datetime
|
|
method: str # 'probe_association', 'data_correlation'
|
|
|
|
|
|
class HiddenSSIDCorrelator:
|
|
"""
|
|
Correlates probe requests with hidden APs to reveal their SSIDs.
|
|
|
|
Uses time-based correlation: when a client probes for an SSID and
|
|
then is seen communicating with a hidden AP, the SSID is likely
|
|
that of the hidden network.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
correlation_window: float = HIDDEN_CORRELATION_WINDOW_SECONDS,
|
|
min_confidence: float = HIDDEN_MIN_CORRELATION_CONFIDENCE,
|
|
):
|
|
"""
|
|
Initialize the correlator.
|
|
|
|
Args:
|
|
correlation_window: Time window for correlation (seconds).
|
|
min_confidence: Minimum confidence to report a correlation.
|
|
"""
|
|
self.correlation_window = correlation_window
|
|
self.min_confidence = min_confidence
|
|
self._lock = threading.Lock()
|
|
|
|
# Storage
|
|
self._probe_records: list[ProbeRecord] = []
|
|
self._association_records: list[AssociationRecord] = []
|
|
self._hidden_aps: dict[str, datetime] = {} # BSSID -> last_seen
|
|
self._revealed: dict[str, CorrelationResult] = {} # BSSID -> result
|
|
|
|
# Callbacks
|
|
self._on_ssid_revealed: Optional[Callable[[CorrelationResult], None]] = None
|
|
|
|
def record_probe(self, client_mac: str, probed_ssid: str, timestamp: Optional[datetime] = None):
|
|
"""
|
|
Record a probe request.
|
|
|
|
Args:
|
|
client_mac: MAC address of the probing client.
|
|
probed_ssid: SSID being probed for.
|
|
timestamp: Time of the probe (defaults to now).
|
|
"""
|
|
if not client_mac or not probed_ssid:
|
|
return
|
|
|
|
timestamp = timestamp or datetime.now()
|
|
client_mac = client_mac.upper()
|
|
|
|
with self._lock:
|
|
self._probe_records.append(ProbeRecord(
|
|
timestamp=timestamp,
|
|
client_mac=client_mac,
|
|
probed_ssid=probed_ssid,
|
|
))
|
|
|
|
# Prune old records
|
|
self._prune_records()
|
|
|
|
# Check for correlations with known hidden APs
|
|
self._check_correlations()
|
|
|
|
def record_association(self, client_mac: str, bssid: str, timestamp: Optional[datetime] = None):
|
|
"""
|
|
Record a client association with an AP.
|
|
|
|
Args:
|
|
client_mac: MAC address of the client.
|
|
bssid: BSSID of the AP.
|
|
timestamp: Time of the association (defaults to now).
|
|
"""
|
|
if not client_mac or not bssid:
|
|
return
|
|
|
|
timestamp = timestamp or datetime.now()
|
|
client_mac = client_mac.upper()
|
|
bssid = bssid.upper()
|
|
|
|
with self._lock:
|
|
self._association_records.append(AssociationRecord(
|
|
timestamp=timestamp,
|
|
client_mac=client_mac,
|
|
bssid=bssid,
|
|
))
|
|
|
|
# Prune old records
|
|
self._prune_records()
|
|
|
|
# Check for correlations
|
|
self._check_correlations()
|
|
|
|
def record_hidden_ap(self, bssid: str, timestamp: Optional[datetime] = None):
|
|
"""
|
|
Record a hidden access point (empty SSID).
|
|
|
|
Args:
|
|
bssid: BSSID of the hidden AP.
|
|
timestamp: Time when seen (defaults to now).
|
|
"""
|
|
if not bssid:
|
|
return
|
|
|
|
timestamp = timestamp or datetime.now()
|
|
bssid = bssid.upper()
|
|
|
|
with self._lock:
|
|
self._hidden_aps[bssid] = timestamp
|
|
|
|
# Check for correlations
|
|
self._check_correlations()
|
|
|
|
def get_revealed_ssid(self, bssid: str) -> Optional[str]:
|
|
"""
|
|
Get the revealed SSID for a hidden AP, if known.
|
|
|
|
Args:
|
|
bssid: BSSID to look up.
|
|
|
|
Returns:
|
|
Revealed SSID or None.
|
|
"""
|
|
with self._lock:
|
|
result = self._revealed.get(bssid.upper())
|
|
return result.revealed_ssid if result else None
|
|
|
|
def get_correlation(self, bssid: str) -> Optional[CorrelationResult]:
|
|
"""
|
|
Get the full correlation result for a hidden AP.
|
|
|
|
Args:
|
|
bssid: BSSID to look up.
|
|
|
|
Returns:
|
|
CorrelationResult or None.
|
|
"""
|
|
with self._lock:
|
|
return self._revealed.get(bssid.upper())
|
|
|
|
def get_all_revealed(self) -> dict[str, str]:
|
|
"""
|
|
Get all revealed SSID mappings.
|
|
|
|
Returns:
|
|
Dict of BSSID -> revealed SSID.
|
|
"""
|
|
with self._lock:
|
|
return {
|
|
bssid: result.revealed_ssid
|
|
for bssid, result in self._revealed.items()
|
|
}
|
|
|
|
def set_callback(self, callback: Callable[[CorrelationResult], None]):
|
|
"""Set callback for when an SSID is revealed."""
|
|
self._on_ssid_revealed = callback
|
|
|
|
def _prune_records(self):
|
|
"""Remove records older than the correlation window."""
|
|
cutoff = datetime.now() - timedelta(seconds=self.correlation_window * 2)
|
|
|
|
self._probe_records = [
|
|
r for r in self._probe_records
|
|
if r.timestamp > cutoff
|
|
]
|
|
|
|
self._association_records = [
|
|
r for r in self._association_records
|
|
if r.timestamp > cutoff
|
|
]
|
|
|
|
def _check_correlations(self):
|
|
"""Check for new SSID correlations."""
|
|
now = datetime.now()
|
|
window = timedelta(seconds=self.correlation_window)
|
|
|
|
for bssid in list(self._hidden_aps.keys()):
|
|
# Skip if already revealed
|
|
if bssid in self._revealed:
|
|
continue
|
|
|
|
# Find associations with this hidden AP
|
|
relevant_associations = [
|
|
a for a in self._association_records
|
|
if a.bssid == bssid and (now - a.timestamp) <= window
|
|
]
|
|
|
|
if not relevant_associations:
|
|
continue
|
|
|
|
# For each associated client, look for recent probes
|
|
for assoc in relevant_associations:
|
|
client_probes = [
|
|
p for p in self._probe_records
|
|
if p.client_mac == assoc.client_mac
|
|
and abs((p.timestamp - assoc.timestamp).total_seconds()) <= self.correlation_window
|
|
]
|
|
|
|
if not client_probes:
|
|
continue
|
|
|
|
# Use the most recent probe from this client
|
|
latest_probe = max(client_probes, key=lambda p: p.timestamp)
|
|
|
|
# Calculate confidence based on timing
|
|
time_diff = abs((latest_probe.timestamp - assoc.timestamp).total_seconds())
|
|
confidence = 1.0 - (time_diff / self.correlation_window)
|
|
confidence = max(0.0, min(1.0, confidence))
|
|
|
|
if confidence >= self.min_confidence:
|
|
result = CorrelationResult(
|
|
bssid=bssid,
|
|
revealed_ssid=latest_probe.probed_ssid,
|
|
client_mac=assoc.client_mac,
|
|
confidence=confidence,
|
|
correlation_time=now,
|
|
method='probe_association',
|
|
)
|
|
|
|
self._revealed[bssid] = result
|
|
|
|
logger.info(
|
|
f"Hidden SSID revealed: {bssid} -> '{latest_probe.probed_ssid}' "
|
|
f"(confidence: {confidence:.2f})"
|
|
)
|
|
|
|
# Callback
|
|
if self._on_ssid_revealed:
|
|
try:
|
|
self._on_ssid_revealed(result)
|
|
except Exception as e:
|
|
logger.debug(f"SSID reveal callback error: {e}")
|
|
|
|
break # Found correlation, move to next AP
|
|
|
|
def clear(self):
|
|
"""Clear all stored data."""
|
|
with self._lock:
|
|
self._probe_records.clear()
|
|
self._association_records.clear()
|
|
self._hidden_aps.clear()
|
|
self._revealed.clear()
|
|
|
|
|
|
def get_hidden_correlator(
|
|
correlation_window: float = HIDDEN_CORRELATION_WINDOW_SECONDS,
|
|
min_confidence: float = HIDDEN_MIN_CORRELATION_CONFIDENCE,
|
|
) -> HiddenSSIDCorrelator:
|
|
"""
|
|
Get or create the global hidden SSID correlator instance.
|
|
|
|
Args:
|
|
correlation_window: Time window for correlation.
|
|
min_confidence: Minimum confidence threshold.
|
|
|
|
Returns:
|
|
HiddenSSIDCorrelator instance.
|
|
"""
|
|
global _correlator_instance
|
|
|
|
with _correlator_lock:
|
|
if _correlator_instance is None:
|
|
_correlator_instance = HiddenSSIDCorrelator(
|
|
correlation_window=correlation_window,
|
|
min_confidence=min_confidence,
|
|
)
|
|
return _correlator_instance
|