mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 22:59:59 -07:00
- Add v2 capabilities, quick scan, deep scan, and status endpoints - Add v2 networks, clients, probes, and channels endpoints - Add v2 SSE stream, export (CSV/JSON), and baseline management - Add recommendation_rank field to ChannelRecommendation model The frontend was already wired up to call these v2 endpoints but they were missing from the backend. This completes the WiFi module v2 API. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
656 lines
21 KiB
Python
656 lines
21 KiB
Python
"""
|
|
WiFi data models for the unified scanner.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from .constants import (
|
|
BAND_UNKNOWN,
|
|
SECURITY_UNKNOWN,
|
|
CIPHER_UNKNOWN,
|
|
AUTH_UNKNOWN,
|
|
WIDTH_UNKNOWN,
|
|
SIGNAL_UNKNOWN,
|
|
PROXIMITY_UNKNOWN,
|
|
SCAN_MODE_QUICK,
|
|
get_band_from_channel,
|
|
get_signal_band,
|
|
get_proximity_band,
|
|
get_vendor_from_mac,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class WiFiObservation:
|
|
"""Represents a single WiFi access point scan result."""
|
|
|
|
timestamp: datetime
|
|
bssid: str
|
|
essid: Optional[str] = None
|
|
channel: Optional[int] = None
|
|
frequency_mhz: Optional[int] = None
|
|
rssi: Optional[int] = None
|
|
|
|
# Security
|
|
security: str = SECURITY_UNKNOWN
|
|
cipher: str = CIPHER_UNKNOWN
|
|
auth: str = AUTH_UNKNOWN
|
|
|
|
# Additional info
|
|
width: str = WIDTH_UNKNOWN
|
|
beacon_count: int = 0
|
|
data_count: int = 0
|
|
|
|
@property
|
|
def is_hidden(self) -> bool:
|
|
"""Check if this is a hidden network."""
|
|
return not self.essid or self.essid.strip() == ''
|
|
|
|
@property
|
|
def band(self) -> str:
|
|
"""Get WiFi band from channel."""
|
|
if self.channel:
|
|
return get_band_from_channel(self.channel)
|
|
return BAND_UNKNOWN
|
|
|
|
@property
|
|
def vendor(self) -> Optional[str]:
|
|
"""Get vendor name from BSSID."""
|
|
return get_vendor_from_mac(self.bssid)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'bssid': self.bssid,
|
|
'essid': self.essid,
|
|
'is_hidden': self.is_hidden,
|
|
'channel': self.channel,
|
|
'frequency_mhz': self.frequency_mhz,
|
|
'band': self.band,
|
|
'rssi': self.rssi,
|
|
'security': self.security,
|
|
'cipher': self.cipher,
|
|
'auth': self.auth,
|
|
'width': self.width,
|
|
'beacon_count': self.beacon_count,
|
|
'data_count': self.data_count,
|
|
'vendor': self.vendor,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WiFiAccessPoint:
|
|
"""Aggregated WiFi access point data over time."""
|
|
|
|
# Identity
|
|
bssid: str
|
|
essid: Optional[str] = None
|
|
is_hidden: bool = False
|
|
revealed_essid: Optional[str] = None # Revealed through correlation
|
|
|
|
# Radio info
|
|
channel: Optional[int] = None
|
|
frequency_mhz: Optional[int] = None
|
|
band: str = BAND_UNKNOWN
|
|
width: str = WIDTH_UNKNOWN
|
|
|
|
# Signal aggregation
|
|
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
|
|
rssi_current: Optional[int] = None
|
|
rssi_median: Optional[float] = None
|
|
rssi_min: Optional[int] = None
|
|
rssi_max: Optional[int] = None
|
|
rssi_variance: Optional[float] = None
|
|
rssi_ema: Optional[float] = None
|
|
|
|
# Proximity/signal bands
|
|
signal_band: str = SIGNAL_UNKNOWN
|
|
proximity_band: str = PROXIMITY_UNKNOWN
|
|
estimated_distance_m: Optional[float] = None
|
|
distance_confidence: float = 0.0
|
|
|
|
# Security
|
|
security: str = SECURITY_UNKNOWN
|
|
cipher: str = CIPHER_UNKNOWN
|
|
auth: str = AUTH_UNKNOWN
|
|
|
|
# Timestamps
|
|
first_seen: datetime = field(default_factory=datetime.now)
|
|
last_seen: datetime = field(default_factory=datetime.now)
|
|
seen_count: int = 0
|
|
seen_rate: float = 0.0 # Observations per minute
|
|
|
|
# Traffic stats
|
|
beacon_count: int = 0
|
|
data_count: int = 0
|
|
client_count: int = 0
|
|
|
|
# Metadata
|
|
vendor: Optional[str] = None
|
|
|
|
# Heuristic flags
|
|
heuristic_flags: list[str] = field(default_factory=list)
|
|
is_new: bool = False
|
|
is_persistent: bool = False
|
|
is_strong_stable: bool = False
|
|
|
|
# Baseline tracking
|
|
in_baseline: bool = False
|
|
baseline_id: Optional[int] = None
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
"""Get display name (revealed SSID, ESSID, or BSSID)."""
|
|
if self.revealed_essid:
|
|
return f"{self.revealed_essid} (revealed)"
|
|
if self.essid and not self.is_hidden:
|
|
return self.essid
|
|
return f"[Hidden] {self.bssid}"
|
|
|
|
@property
|
|
def age_seconds(self) -> float:
|
|
"""Seconds since last seen."""
|
|
return (datetime.now() - self.last_seen).total_seconds()
|
|
|
|
@property
|
|
def duration_seconds(self) -> float:
|
|
"""Total duration from first to last seen."""
|
|
return (self.last_seen - self.first_seen).total_seconds()
|
|
|
|
def get_rssi_history(self, max_points: int = 50) -> list[dict]:
|
|
"""Get RSSI history for visualization."""
|
|
if not self.rssi_samples:
|
|
return []
|
|
samples = self.rssi_samples[-max_points:]
|
|
return [
|
|
{'timestamp': ts.isoformat(), 'rssi': rssi}
|
|
for ts, rssi in samples
|
|
]
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
# Identity
|
|
'bssid': self.bssid,
|
|
'essid': self.essid,
|
|
'display_name': self.display_name,
|
|
'is_hidden': self.is_hidden,
|
|
'revealed_essid': self.revealed_essid,
|
|
|
|
# Radio
|
|
'channel': self.channel,
|
|
'frequency_mhz': self.frequency_mhz,
|
|
'band': self.band,
|
|
'width': self.width,
|
|
|
|
# Signal
|
|
'rssi_current': self.rssi_current,
|
|
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
|
|
'rssi_min': self.rssi_min,
|
|
'rssi_max': self.rssi_max,
|
|
'rssi_variance': round(self.rssi_variance, 2) if self.rssi_variance else None,
|
|
'rssi_ema': round(self.rssi_ema, 1) if self.rssi_ema else None,
|
|
'rssi_history': self.get_rssi_history(),
|
|
|
|
# Proximity
|
|
'signal_band': self.signal_band,
|
|
'proximity_band': self.proximity_band,
|
|
'estimated_distance_m': round(self.estimated_distance_m, 2) if self.estimated_distance_m else None,
|
|
'distance_confidence': round(self.distance_confidence, 2),
|
|
|
|
# Security
|
|
'security': self.security,
|
|
'cipher': self.cipher,
|
|
'auth': self.auth,
|
|
|
|
# Timestamps
|
|
'first_seen': self.first_seen.isoformat(),
|
|
'last_seen': self.last_seen.isoformat(),
|
|
'age_seconds': round(self.age_seconds, 1),
|
|
'duration_seconds': round(self.duration_seconds, 1),
|
|
'seen_count': self.seen_count,
|
|
'seen_rate': round(self.seen_rate, 2),
|
|
|
|
# Traffic
|
|
'beacon_count': self.beacon_count,
|
|
'data_count': self.data_count,
|
|
'client_count': self.client_count,
|
|
|
|
# Metadata
|
|
'vendor': self.vendor,
|
|
|
|
# Heuristics
|
|
'heuristic_flags': self.heuristic_flags,
|
|
'heuristics': {
|
|
'is_new': self.is_new,
|
|
'is_persistent': self.is_persistent,
|
|
'is_strong_stable': self.is_strong_stable,
|
|
},
|
|
|
|
# Baseline
|
|
'in_baseline': self.in_baseline,
|
|
'baseline_id': self.baseline_id,
|
|
}
|
|
|
|
def to_summary_dict(self) -> dict:
|
|
"""Compact dictionary for list views."""
|
|
return {
|
|
'bssid': self.bssid,
|
|
'essid': self.essid,
|
|
'display_name': self.display_name,
|
|
'is_hidden': self.is_hidden,
|
|
'channel': self.channel,
|
|
'band': self.band,
|
|
'rssi_current': self.rssi_current,
|
|
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
|
|
'signal_band': self.signal_band,
|
|
'proximity_band': self.proximity_band,
|
|
'security': self.security,
|
|
'vendor': self.vendor,
|
|
'client_count': self.client_count,
|
|
'last_seen': self.last_seen.isoformat(),
|
|
'age_seconds': round(self.age_seconds, 1),
|
|
'heuristic_flags': self.heuristic_flags,
|
|
'in_baseline': self.in_baseline,
|
|
}
|
|
|
|
def to_legacy_dict(self) -> dict:
|
|
"""Convert to legacy format for TSCM compatibility."""
|
|
return {
|
|
'bssid': self.bssid,
|
|
'essid': self.essid or '',
|
|
'power': str(self.rssi_current) if self.rssi_current else '-100',
|
|
'channel': str(self.channel) if self.channel else '',
|
|
'privacy': self.security,
|
|
'first_seen': self.first_seen.isoformat() if self.first_seen else '',
|
|
'last_seen': self.last_seen.isoformat() if self.last_seen else '',
|
|
'beacon_count': str(self.beacon_count),
|
|
'lan_ip': '', # Not tracked in new system
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WiFiClient:
|
|
"""WiFi client (station) observed during scanning."""
|
|
|
|
# Identity
|
|
mac: str
|
|
vendor: Optional[str] = None
|
|
|
|
# Signal
|
|
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
|
|
rssi_current: Optional[int] = None
|
|
rssi_median: Optional[float] = None
|
|
rssi_min: Optional[int] = None
|
|
rssi_max: Optional[int] = None
|
|
rssi_ema: Optional[float] = None
|
|
|
|
# Proximity
|
|
signal_band: str = SIGNAL_UNKNOWN
|
|
proximity_band: str = PROXIMITY_UNKNOWN
|
|
estimated_distance_m: Optional[float] = None
|
|
|
|
# Association
|
|
associated_bssid: Optional[str] = None
|
|
is_associated: bool = False
|
|
|
|
# Probes
|
|
probed_ssids: list[str] = field(default_factory=list)
|
|
probe_timestamps: dict[str, datetime] = field(default_factory=dict)
|
|
|
|
# Timestamps
|
|
first_seen: datetime = field(default_factory=datetime.now)
|
|
last_seen: datetime = field(default_factory=datetime.now)
|
|
seen_count: int = 0
|
|
|
|
# Traffic stats
|
|
packets_sent: int = 0
|
|
packets_received: int = 0
|
|
|
|
# Heuristics
|
|
heuristic_flags: list[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def age_seconds(self) -> float:
|
|
"""Seconds since last seen."""
|
|
return (datetime.now() - self.last_seen).total_seconds()
|
|
|
|
def get_rssi_history(self, max_points: int = 50) -> list[dict]:
|
|
"""Get RSSI history for visualization."""
|
|
if not self.rssi_samples:
|
|
return []
|
|
samples = self.rssi_samples[-max_points:]
|
|
return [
|
|
{'timestamp': ts.isoformat(), 'rssi': rssi}
|
|
for ts, rssi in samples
|
|
]
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'mac': self.mac,
|
|
'vendor': self.vendor,
|
|
|
|
# Signal
|
|
'rssi_current': self.rssi_current,
|
|
'rssi_median': round(self.rssi_median, 1) if self.rssi_median else None,
|
|
'rssi_min': self.rssi_min,
|
|
'rssi_max': self.rssi_max,
|
|
'rssi_ema': round(self.rssi_ema, 1) if self.rssi_ema else None,
|
|
'rssi_history': self.get_rssi_history(),
|
|
|
|
# Proximity
|
|
'signal_band': self.signal_band,
|
|
'proximity_band': self.proximity_band,
|
|
'estimated_distance_m': round(self.estimated_distance_m, 2) if self.estimated_distance_m else None,
|
|
|
|
# Association
|
|
'associated_bssid': self.associated_bssid,
|
|
'is_associated': self.is_associated,
|
|
|
|
# Probes
|
|
'probed_ssids': self.probed_ssids,
|
|
'probe_count': len(self.probed_ssids),
|
|
|
|
# Timestamps
|
|
'first_seen': self.first_seen.isoformat(),
|
|
'last_seen': self.last_seen.isoformat(),
|
|
'age_seconds': round(self.age_seconds, 1),
|
|
'seen_count': self.seen_count,
|
|
|
|
# Traffic
|
|
'packets_sent': self.packets_sent,
|
|
'packets_received': self.packets_received,
|
|
|
|
# Heuristics
|
|
'heuristic_flags': self.heuristic_flags,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WiFiProbeRequest:
|
|
"""A single probe request captured during scanning."""
|
|
|
|
timestamp: datetime
|
|
client_mac: str
|
|
probed_ssid: str
|
|
rssi: Optional[int] = None
|
|
client_vendor: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'client_mac': self.client_mac,
|
|
'probed_ssid': self.probed_ssid,
|
|
'rssi': self.rssi,
|
|
'client_vendor': self.client_vendor,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ChannelStats:
|
|
"""Statistics for a single WiFi channel."""
|
|
|
|
channel: int
|
|
band: str = BAND_UNKNOWN
|
|
frequency_mhz: Optional[int] = None
|
|
|
|
# Counts
|
|
ap_count: int = 0
|
|
client_count: int = 0
|
|
|
|
# Signal stats
|
|
rssi_avg: Optional[float] = None
|
|
rssi_min: Optional[int] = None
|
|
rssi_max: Optional[int] = None
|
|
|
|
# Utilization score (0.0-1.0, lower is better)
|
|
utilization_score: float = 0.0
|
|
|
|
# Recommendation rank (1 = best)
|
|
recommendation_rank: Optional[int] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'channel': self.channel,
|
|
'band': self.band,
|
|
'frequency_mhz': self.frequency_mhz,
|
|
'ap_count': self.ap_count,
|
|
'client_count': self.client_count,
|
|
'rssi_avg': round(self.rssi_avg, 1) if self.rssi_avg else None,
|
|
'rssi_min': self.rssi_min,
|
|
'rssi_max': self.rssi_max,
|
|
'utilization_score': round(self.utilization_score, 3),
|
|
'recommendation_rank': self.recommendation_rank,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ChannelRecommendation:
|
|
"""Channel recommendation with reasoning."""
|
|
|
|
channel: int
|
|
band: str
|
|
score: float # Lower is better
|
|
reason: str
|
|
is_dfs: bool = False
|
|
recommendation_rank: Optional[int] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'channel': self.channel,
|
|
'band': self.band,
|
|
'score': round(self.score, 3),
|
|
'reason': self.reason,
|
|
'is_dfs': self.is_dfs,
|
|
'rank': self.recommendation_rank,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WiFiScanResult:
|
|
"""Complete result from a WiFi scan operation."""
|
|
|
|
# Discovered entities
|
|
access_points: list[WiFiAccessPoint] = field(default_factory=list)
|
|
clients: list[WiFiClient] = field(default_factory=list)
|
|
probe_requests: list[WiFiProbeRequest] = field(default_factory=list)
|
|
|
|
# Channel analysis
|
|
channel_stats: list[ChannelStats] = field(default_factory=list)
|
|
recommendations: list[ChannelRecommendation] = field(default_factory=list)
|
|
|
|
# Scan metadata
|
|
scan_mode: str = SCAN_MODE_QUICK
|
|
interface: Optional[str] = None
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
duration_seconds: Optional[float] = None
|
|
|
|
# Status
|
|
is_complete: bool = False
|
|
error: Optional[str] = None
|
|
warnings: list[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def network_count(self) -> int:
|
|
"""Total number of access points found."""
|
|
return len(self.access_points)
|
|
|
|
@property
|
|
def client_count(self) -> int:
|
|
"""Total number of clients found."""
|
|
return len(self.clients)
|
|
|
|
@property
|
|
def hidden_count(self) -> int:
|
|
"""Number of hidden networks."""
|
|
return sum(1 for ap in self.access_points if ap.is_hidden)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
# Entities
|
|
'access_points': [ap.to_dict() for ap in self.access_points],
|
|
'clients': [c.to_dict() for c in self.clients],
|
|
'probe_requests': [p.to_dict() for p in self.probe_requests],
|
|
|
|
# Channel analysis
|
|
'channel_stats': [cs.to_dict() for cs in self.channel_stats],
|
|
'recommendations': [r.to_dict() for r in self.recommendations],
|
|
|
|
# Metadata
|
|
'scan_mode': self.scan_mode,
|
|
'interface': self.interface,
|
|
'started_at': self.started_at.isoformat() if self.started_at else None,
|
|
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
|
|
'duration_seconds': round(self.duration_seconds, 2) if self.duration_seconds else None,
|
|
|
|
# Stats
|
|
'network_count': self.network_count,
|
|
'client_count': self.client_count,
|
|
'hidden_count': self.hidden_count,
|
|
|
|
# Status
|
|
'is_complete': self.is_complete,
|
|
'error': self.error,
|
|
'warnings': self.warnings,
|
|
}
|
|
|
|
def to_summary_dict(self) -> dict:
|
|
"""Compact summary for status endpoints."""
|
|
return {
|
|
'scan_mode': self.scan_mode,
|
|
'interface': self.interface,
|
|
'network_count': self.network_count,
|
|
'client_count': self.client_count,
|
|
'hidden_count': self.hidden_count,
|
|
'is_complete': self.is_complete,
|
|
'error': self.error,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WiFiScanStatus:
|
|
"""Current WiFi scanning status."""
|
|
|
|
is_scanning: bool = False
|
|
scan_mode: str = SCAN_MODE_QUICK
|
|
interface: Optional[str] = None
|
|
started_at: Optional[datetime] = None
|
|
networks_found: int = 0
|
|
clients_found: int = 0
|
|
error: Optional[str] = None
|
|
|
|
@property
|
|
def elapsed_seconds(self) -> Optional[float]:
|
|
"""Seconds since scan started."""
|
|
if self.started_at:
|
|
return (datetime.now() - self.started_at).total_seconds()
|
|
return None
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'is_scanning': self.is_scanning,
|
|
'scan_mode': self.scan_mode,
|
|
'interface': self.interface,
|
|
'started_at': self.started_at.isoformat() if self.started_at else None,
|
|
'elapsed_seconds': round(self.elapsed_seconds, 1) if self.elapsed_seconds else None,
|
|
'networks_found': self.networks_found,
|
|
'clients_found': self.clients_found,
|
|
'error': self.error,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WiFiCapabilities:
|
|
"""WiFi system capabilities check result."""
|
|
|
|
# Platform
|
|
platform: str = 'unknown' # 'linux', 'darwin', 'windows'
|
|
is_root: bool = False
|
|
|
|
# Interfaces
|
|
interfaces: list[dict] = field(default_factory=list)
|
|
default_interface: Optional[str] = None
|
|
|
|
# Quick scan tools
|
|
has_nmcli: bool = False
|
|
has_iw: bool = False
|
|
has_iwlist: bool = False
|
|
has_airport: bool = False
|
|
preferred_quick_tool: Optional[str] = None
|
|
|
|
# Deep scan tools
|
|
has_airmon_ng: bool = False
|
|
has_airodump_ng: bool = False
|
|
has_monitor_capable_interface: bool = False
|
|
monitor_interface: Optional[str] = None
|
|
|
|
# Issues
|
|
issues: list[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def can_quick_scan(self) -> bool:
|
|
"""Whether quick scanning is available."""
|
|
return (
|
|
self.has_nmcli or
|
|
self.has_iw or
|
|
self.has_iwlist or
|
|
self.has_airport
|
|
) and len(self.interfaces) > 0
|
|
|
|
@property
|
|
def can_deep_scan(self) -> bool:
|
|
"""Whether deep scanning is available."""
|
|
return (
|
|
self.has_airmon_ng and
|
|
self.has_airodump_ng and
|
|
self.has_monitor_capable_interface and
|
|
self.is_root
|
|
)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
# Status
|
|
'available': self.can_quick_scan,
|
|
'can_quick_scan': self.can_quick_scan,
|
|
'can_deep_scan': self.can_deep_scan,
|
|
|
|
# Platform
|
|
'platform': self.platform,
|
|
'is_root': self.is_root,
|
|
|
|
# Interfaces
|
|
'interfaces': self.interfaces,
|
|
'default_interface': self.default_interface,
|
|
|
|
# Quick scan tools
|
|
'tools': {
|
|
'nmcli': self.has_nmcli,
|
|
'iw': self.has_iw,
|
|
'iwlist': self.has_iwlist,
|
|
'airport': self.has_airport,
|
|
'airmon_ng': self.has_airmon_ng,
|
|
'airodump_ng': self.has_airodump_ng,
|
|
},
|
|
'preferred_quick_tool': self.preferred_quick_tool,
|
|
|
|
# Deep scan
|
|
'has_monitor_capable_interface': self.has_monitor_capable_interface,
|
|
'monitor_interface': self.monitor_interface,
|
|
|
|
# Issues
|
|
'issues': self.issues,
|
|
}
|