Files
intercept/utils/wifi/models.py
Smittix e00fbfddc1 v2.26.0: fix SSE fanout crash and branded logo FOUC
- Fix SSE fanout thread AttributeError when source queue is None during
  interpreter shutdown by snapshotting to local variable with null guard
- Fix branded "i" logo rendering oversized on first page load (FOUC) by
  adding inline width/height to SVG elements across 10 templates
- Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:51:27 +00:00

654 lines
20 KiB
Python

"""
WiFi data models for the unified scanner.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from .constants import (
AUTH_UNKNOWN,
BAND_UNKNOWN,
CIPHER_UNKNOWN,
PROXIMITY_UNKNOWN,
SCAN_MODE_QUICK,
SECURITY_UNKNOWN,
SIGNAL_UNKNOWN,
WIDTH_UNKNOWN,
get_band_from_channel,
get_vendor_from_mac,
)
@dataclass
class WiFiObservation:
"""Represents a single WiFi access point scan result."""
timestamp: datetime
bssid: str
essid: str | None = None
channel: int | None = None
frequency_mhz: int | None = None
rssi: int | None = None
# Security
security: str = SECURITY_UNKNOWN
cipher: str = CIPHER_UNKNOWN
auth: str = AUTH_UNKNOWN
# Additional info
width: str = WIDTH_UNKNOWN
beacon_count: int = 0
data_count: int = 0
@property
def is_hidden(self) -> bool:
"""Check if this is a hidden network."""
return not self.essid or self.essid.strip() == ''
@property
def band(self) -> str:
"""Get WiFi band from channel."""
if self.channel:
return get_band_from_channel(self.channel)
return BAND_UNKNOWN
@property
def vendor(self) -> str | None:
"""Get vendor name from BSSID."""
return get_vendor_from_mac(self.bssid)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'timestamp': self.timestamp.isoformat(),
'bssid': self.bssid,
'essid': self.essid,
'is_hidden': self.is_hidden,
'channel': self.channel,
'frequency_mhz': self.frequency_mhz,
'band': self.band,
'rssi': self.rssi,
'security': self.security,
'cipher': self.cipher,
'auth': self.auth,
'width': self.width,
'beacon_count': self.beacon_count,
'data_count': self.data_count,
'vendor': self.vendor,
}
@dataclass
class WiFiAccessPoint:
"""Aggregated WiFi access point data over time."""
# Identity
bssid: str
essid: str | None = None
is_hidden: bool = False
revealed_essid: str | None = None # Revealed through correlation
# Radio info
channel: int | None = None
frequency_mhz: int | None = None
band: str = BAND_UNKNOWN
width: str = WIDTH_UNKNOWN
# Signal aggregation
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
rssi_current: int | None = None
rssi_median: float | None = None
rssi_min: int | None = None
rssi_max: int | None = None
rssi_variance: float | None = None
rssi_ema: float | None = None
# Proximity/signal bands
signal_band: str = SIGNAL_UNKNOWN
proximity_band: str = PROXIMITY_UNKNOWN
estimated_distance_m: float | None = None
distance_confidence: float = 0.0
# Security
security: str = SECURITY_UNKNOWN
cipher: str = CIPHER_UNKNOWN
auth: str = AUTH_UNKNOWN
# Timestamps
first_seen: datetime = field(default_factory=datetime.now)
last_seen: datetime = field(default_factory=datetime.now)
seen_count: int = 0
seen_rate: float = 0.0 # Observations per minute
# Traffic stats
beacon_count: int = 0
data_count: int = 0
client_count: int = 0
# Metadata
vendor: str | None = None
# Heuristic flags
heuristic_flags: list[str] = field(default_factory=list)
is_new: bool = False
is_persistent: bool = False
is_strong_stable: bool = False
# Baseline tracking
in_baseline: bool = False
baseline_id: int | None = None
@property
def display_name(self) -> str:
"""Get display name (revealed SSID, ESSID, or BSSID)."""
if self.revealed_essid:
return f"{self.revealed_essid} (revealed)"
if self.essid and not self.is_hidden:
return self.essid
return f"[Hidden] {self.bssid}"
@property
def age_seconds(self) -> float:
"""Seconds since last seen."""
return (datetime.now() - self.last_seen).total_seconds()
@property
def duration_seconds(self) -> float:
"""Total duration from first to last seen."""
return (self.last_seen - self.first_seen).total_seconds()
def get_rssi_history(self, max_points: int = 50) -> list[dict]:
"""Get RSSI history for visualization."""
if not self.rssi_samples:
return []
samples = self.rssi_samples[-max_points:]
return [
{'timestamp': ts.isoformat(), 'rssi': rssi}
for ts, rssi in samples
]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# Identity
'bssid': self.bssid,
'essid': self.essid,
'display_name': self.display_name,
'is_hidden': self.is_hidden,
'revealed_essid': self.revealed_essid,
# Radio
'channel': self.channel,
'frequency_mhz': self.frequency_mhz,
'band': self.band,
'width': self.width,
# Signal
'rssi_current': self.rssi_current,
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'rssi_variance': round(self.rssi_variance, 2) if self.rssi_variance else None,
'rssi_ema': round(self.rssi_ema, 1) if self.rssi_ema else None,
'rssi_history': self.get_rssi_history(),
# Proximity
'signal_band': self.signal_band,
'proximity_band': self.proximity_band,
'estimated_distance_m': round(self.estimated_distance_m, 2) if self.estimated_distance_m else None,
'distance_confidence': round(self.distance_confidence, 2),
# Security
'security': self.security,
'cipher': self.cipher,
'auth': self.auth,
# Timestamps
'first_seen': self.first_seen.isoformat(),
'last_seen': self.last_seen.isoformat(),
'age_seconds': round(self.age_seconds, 1),
'duration_seconds': round(self.duration_seconds, 1),
'seen_count': self.seen_count,
'seen_rate': round(self.seen_rate, 2),
# Traffic
'beacon_count': self.beacon_count,
'data_count': self.data_count,
'client_count': self.client_count,
# Metadata
'vendor': self.vendor,
# Heuristics
'heuristic_flags': self.heuristic_flags,
'heuristics': {
'is_new': self.is_new,
'is_persistent': self.is_persistent,
'is_strong_stable': self.is_strong_stable,
},
# Baseline
'in_baseline': self.in_baseline,
'baseline_id': self.baseline_id,
}
def to_summary_dict(self) -> dict:
"""Compact dictionary for list views."""
return {
'bssid': self.bssid,
'essid': self.essid,
'display_name': self.display_name,
'is_hidden': self.is_hidden,
'channel': self.channel,
'band': self.band,
'rssi_current': self.rssi_current,
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
'signal_band': self.signal_band,
'proximity_band': self.proximity_band,
'security': self.security,
'vendor': self.vendor,
'client_count': self.client_count,
'last_seen': self.last_seen.isoformat(),
'age_seconds': round(self.age_seconds, 1),
'heuristic_flags': self.heuristic_flags,
'in_baseline': self.in_baseline,
}
def to_legacy_dict(self) -> dict:
"""Convert to legacy format for TSCM compatibility."""
return {
'bssid': self.bssid,
'essid': self.essid or '',
'vendor': self.vendor,
'power': str(self.rssi_current) if self.rssi_current else '-100',
'channel': str(self.channel) if self.channel else '',
'privacy': self.security,
'first_seen': self.first_seen.isoformat() if self.first_seen else '',
'last_seen': self.last_seen.isoformat() if self.last_seen else '',
'beacon_count': str(self.beacon_count),
'lan_ip': '', # Not tracked in new system
}
@dataclass
class WiFiClient:
"""WiFi client (station) observed during scanning."""
# Identity
mac: str
vendor: str | None = None
# Signal
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
rssi_current: int | None = None
rssi_median: float | None = None
rssi_min: int | None = None
rssi_max: int | None = None
rssi_ema: float | None = None
# Proximity
signal_band: str = SIGNAL_UNKNOWN
proximity_band: str = PROXIMITY_UNKNOWN
estimated_distance_m: float | None = None
# Association
associated_bssid: str | None = None
is_associated: bool = False
# Probes
probed_ssids: list[str] = field(default_factory=list)
probe_timestamps: dict[str, datetime] = field(default_factory=dict)
# Timestamps
first_seen: datetime = field(default_factory=datetime.now)
last_seen: datetime = field(default_factory=datetime.now)
seen_count: int = 0
# Traffic stats
packets_sent: int = 0
packets_received: int = 0
# Heuristics
heuristic_flags: list[str] = field(default_factory=list)
@property
def age_seconds(self) -> float:
"""Seconds since last seen."""
return (datetime.now() - self.last_seen).total_seconds()
def get_rssi_history(self, max_points: int = 50) -> list[dict]:
"""Get RSSI history for visualization."""
if not self.rssi_samples:
return []
samples = self.rssi_samples[-max_points:]
return [
{'timestamp': ts.isoformat(), 'rssi': rssi}
for ts, rssi in samples
]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'mac': self.mac,
'vendor': self.vendor,
# Signal
'rssi_current': self.rssi_current,
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'rssi_ema': round(self.rssi_ema, 1) if self.rssi_ema else None,
'rssi_history': self.get_rssi_history(),
# Proximity
'signal_band': self.signal_band,
'proximity_band': self.proximity_band,
'estimated_distance_m': round(self.estimated_distance_m, 2) if self.estimated_distance_m else None,
# Association
'associated_bssid': self.associated_bssid,
'is_associated': self.is_associated,
# Probes
'probed_ssids': self.probed_ssids,
'probe_count': len(self.probed_ssids),
# Timestamps
'first_seen': self.first_seen.isoformat(),
'last_seen': self.last_seen.isoformat(),
'age_seconds': round(self.age_seconds, 1),
'seen_count': self.seen_count,
# Traffic
'packets_sent': self.packets_sent,
'packets_received': self.packets_received,
# Heuristics
'heuristic_flags': self.heuristic_flags,
}
@dataclass
class WiFiProbeRequest:
"""A single probe request captured during scanning."""
timestamp: datetime
client_mac: str
probed_ssid: str
rssi: int | None = None
client_vendor: str | None = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'timestamp': self.timestamp.isoformat(),
'client_mac': self.client_mac,
'probed_ssid': self.probed_ssid,
'rssi': self.rssi,
'client_vendor': self.client_vendor,
}
@dataclass
class ChannelStats:
"""Statistics for a single WiFi channel."""
channel: int
band: str = BAND_UNKNOWN
frequency_mhz: int | None = None
# Counts
ap_count: int = 0
client_count: int = 0
# Signal stats
rssi_avg: float | None = None
rssi_min: int | None = None
rssi_max: int | None = None
# Utilization score (0.0-1.0, lower is better)
utilization_score: float = 0.0
# Recommendation rank (1 = best)
recommendation_rank: int | None = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'channel': self.channel,
'band': self.band,
'frequency_mhz': self.frequency_mhz,
'ap_count': self.ap_count,
'client_count': self.client_count,
'rssi_avg': round(self.rssi_avg, 1) if self.rssi_avg else None,
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'utilization_score': round(self.utilization_score, 3),
'recommendation_rank': self.recommendation_rank,
}
@dataclass
class ChannelRecommendation:
"""Channel recommendation with reasoning."""
channel: int
band: str
score: float # Lower is better
reason: str
is_dfs: bool = False
recommendation_rank: int | None = None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'channel': self.channel,
'band': self.band,
'score': round(self.score, 3),
'reason': self.reason,
'is_dfs': self.is_dfs,
'rank': self.recommendation_rank,
}
@dataclass
class WiFiScanResult:
"""Complete result from a WiFi scan operation."""
# Discovered entities
access_points: list[WiFiAccessPoint] = field(default_factory=list)
clients: list[WiFiClient] = field(default_factory=list)
probe_requests: list[WiFiProbeRequest] = field(default_factory=list)
# Channel analysis
channel_stats: list[ChannelStats] = field(default_factory=list)
recommendations: list[ChannelRecommendation] = field(default_factory=list)
# Scan metadata
scan_mode: str = SCAN_MODE_QUICK
interface: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
duration_seconds: float | None = None
# Status
is_complete: bool = False
error: str | None = None
warnings: list[str] = field(default_factory=list)
@property
def network_count(self) -> int:
"""Total number of access points found."""
return len(self.access_points)
@property
def client_count(self) -> int:
"""Total number of clients found."""
return len(self.clients)
@property
def hidden_count(self) -> int:
"""Number of hidden networks."""
return sum(1 for ap in self.access_points if ap.is_hidden)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# Entities
'access_points': [ap.to_dict() for ap in self.access_points],
'clients': [c.to_dict() for c in self.clients],
'probe_requests': [p.to_dict() for p in self.probe_requests],
# Channel analysis
'channel_stats': [cs.to_dict() for cs in self.channel_stats],
'recommendations': [r.to_dict() for r in self.recommendations],
# Metadata
'scan_mode': self.scan_mode,
'interface': self.interface,
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'duration_seconds': round(self.duration_seconds, 2) if self.duration_seconds else None,
# Stats
'network_count': self.network_count,
'client_count': self.client_count,
'hidden_count': self.hidden_count,
# Status
'is_complete': self.is_complete,
'error': self.error,
'warnings': self.warnings,
}
def to_summary_dict(self) -> dict:
"""Compact summary for status endpoints."""
return {
'scan_mode': self.scan_mode,
'interface': self.interface,
'network_count': self.network_count,
'client_count': self.client_count,
'hidden_count': self.hidden_count,
'is_complete': self.is_complete,
'error': self.error,
}
@dataclass
class WiFiScanStatus:
"""Current WiFi scanning status."""
is_scanning: bool = False
scan_mode: str = SCAN_MODE_QUICK
interface: str | None = None
started_at: datetime | None = None
networks_found: int = 0
clients_found: int = 0
error: str | None = None
@property
def elapsed_seconds(self) -> float | None:
"""Seconds since scan started."""
if self.started_at:
return (datetime.now() - self.started_at).total_seconds()
return None
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'is_scanning': self.is_scanning,
'scan_mode': self.scan_mode,
'interface': self.interface,
'started_at': self.started_at.isoformat() if self.started_at else None,
'elapsed_seconds': round(self.elapsed_seconds, 1) if self.elapsed_seconds else None,
'networks_found': self.networks_found,
'clients_found': self.clients_found,
'error': self.error,
}
@dataclass
class WiFiCapabilities:
"""WiFi system capabilities check result."""
# Platform
platform: str = 'unknown' # 'linux', 'darwin', 'windows'
is_root: bool = False
# Interfaces
interfaces: list[dict] = field(default_factory=list)
default_interface: str | None = None
# Quick scan tools
has_nmcli: bool = False
has_iw: bool = False
has_iwlist: bool = False
has_airport: bool = False
preferred_quick_tool: str | None = None
# Deep scan tools
has_airmon_ng: bool = False
has_airodump_ng: bool = False
has_monitor_capable_interface: bool = False
monitor_interface: str | None = None
# Issues
issues: list[str] = field(default_factory=list)
@property
def can_quick_scan(self) -> bool:
"""Whether quick scanning is available."""
return (
self.has_nmcli or
self.has_iw or
self.has_iwlist or
self.has_airport
) and len(self.interfaces) > 0
@property
def can_deep_scan(self) -> bool:
"""Whether deep scanning is available."""
return (
self.has_airmon_ng and
self.has_airodump_ng and
self.has_monitor_capable_interface and
self.is_root
)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
# Status
'available': self.can_quick_scan,
'can_quick_scan': self.can_quick_scan,
'can_deep_scan': self.can_deep_scan,
# Platform
'platform': self.platform,
'is_root': self.is_root,
# Interfaces
'interfaces': self.interfaces,
'default_interface': self.default_interface,
# Quick scan tools
'tools': {
'nmcli': self.has_nmcli,
'iw': self.has_iw,
'iwlist': self.has_iwlist,
'airport': self.has_airport,
'airmon_ng': self.has_airmon_ng,
'airodump_ng': self.has_airodump_ng,
},
'preferred_quick_tool': self.preferred_quick_tool,
# Deep scan
'has_monitor_capable_interface': self.has_monitor_capable_interface,
'monitor_interface': self.monitor_interface,
# Issues
'issues': self.issues,
}