""" Unified WiFi scanner coordinator. Provides dual-mode scanning: - Quick Scan: Uses system tools (nmcli, iw, iwlist, airport) without monitor mode - Deep Scan: Uses airodump-ng with monitor mode for clients and probes """ from __future__ import annotations import logging import os import platform import queue import re import shutil import subprocess import threading import time from datetime import datetime from pathlib import Path from typing import Callable, Generator, Optional from .constants import ( DEFAULT_QUICK_SCAN_TIMEOUT, SCAN_MODE_QUICK, SCAN_MODE_DEEP, QUICK_SCAN_TOOLS_LINUX, QUICK_SCAN_TOOLS_DARWIN, TOOL_TIMEOUT_QUICK, TOOL_TIMEOUT_DETECT, NETWORK_STALE_TIMEOUT, MAX_RSSI_SAMPLES, WIFI_EMA_ALPHA, get_signal_band, get_proximity_band, get_vendor_from_mac, ) from .models import ( WiFiAccessPoint, WiFiClient, WiFiProbeRequest, WiFiScanResult, WiFiScanStatus, WiFiCapabilities, WiFiObservation, ChannelStats, ChannelRecommendation, ) logger = logging.getLogger(__name__) # Global scanner instance _scanner_instance: Optional['UnifiedWiFiScanner'] = None _scanner_lock = threading.Lock() class UnifiedWiFiScanner: """ Unified WiFi scanner with Quick Scan and Deep Scan modes. Quick Scan: One-shot scan using system tools Deep Scan: Continuous monitoring with airodump-ng """ def __init__(self, interface: Optional[str] = None): """ Initialize WiFi scanner. Args: interface: WiFi interface name (e.g., 'wlan0', 'en0'). """ self._interface = interface self._lock = threading.Lock() # State self._status = WiFiScanStatus() self._capabilities: Optional[WiFiCapabilities] = None # Discovered entities self._access_points: dict[str, WiFiAccessPoint] = {} # bssid -> AP self._clients: dict[str, WiFiClient] = {} # mac -> Client self._probe_requests: list[WiFiProbeRequest] = [] # Deep scan process self._deep_scan_process: Optional[subprocess.Popen] = None self._deep_scan_thread: Optional[threading.Thread] = None self._deep_scan_stop_event = threading.Event() # Event queue for SSE streaming self._event_queue: queue.Queue = queue.Queue(maxsize=1000) # Callbacks self._on_network_updated: Optional[Callable[[WiFiAccessPoint], None]] = None self._on_client_updated: Optional[Callable[[WiFiClient], None]] = None self._on_probe_request: Optional[Callable[[WiFiProbeRequest], None]] = None # Baseline tracking self._baseline_networks: set[str] = set() # BSSIDs in baseline self._baseline_set_at: Optional[datetime] = None # ========================================================================= # Properties # ========================================================================= @property def is_scanning(self) -> bool: """Check if currently scanning.""" return self._status.is_scanning @property def access_points(self) -> list[WiFiAccessPoint]: """Get all discovered access points.""" with self._lock: return list(self._access_points.values()) @property def clients(self) -> list[WiFiClient]: """Get all discovered clients.""" with self._lock: return list(self._clients.values()) @property def probe_requests(self) -> list[WiFiProbeRequest]: """Get all captured probe requests.""" with self._lock: return list(self._probe_requests) # ========================================================================= # Capability Detection # ========================================================================= def check_capabilities(self) -> WiFiCapabilities: """ Check WiFi scanning capabilities on this system. Returns: WiFiCapabilities with available tools and interfaces. """ caps = WiFiCapabilities() caps.platform = platform.system().lower() caps.is_root = os.geteuid() == 0 if hasattr(os, 'geteuid') else False # Detect tools caps.has_nmcli = shutil.which('nmcli') is not None caps.has_iw = shutil.which('iw') is not None caps.has_iwlist = shutil.which('iwlist') is not None caps.has_airmon_ng = shutil.which('airmon-ng') is not None caps.has_airodump_ng = shutil.which('airodump-ng') is not None # macOS airport tool airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport' caps.has_airport = os.path.exists(airport_path) # Determine preferred quick scan tool if caps.platform == 'darwin': if caps.has_airport: caps.preferred_quick_tool = 'airport' else: # Linux if caps.has_nmcli: caps.preferred_quick_tool = 'nmcli' elif caps.has_iw: caps.preferred_quick_tool = 'iw' elif caps.has_iwlist: caps.preferred_quick_tool = 'iwlist' # Detect interfaces caps.interfaces = self._detect_interfaces() if caps.interfaces: caps.default_interface = caps.interfaces[0].get('name') # Check for monitor-capable interface for iface in caps.interfaces: if iface.get('supports_monitor', False): caps.has_monitor_capable_interface = True caps.monitor_interface = iface.get('name') break # Build issues list if not caps.interfaces: caps.issues.append('No WiFi interfaces detected') if not caps.can_quick_scan: caps.issues.append('No quick scan tools available') if not caps.can_deep_scan: if not caps.has_airodump_ng: caps.issues.append('airodump-ng not installed (install aircrack-ng)') if not caps.is_root: caps.issues.append('Root privileges required for deep scan') if not caps.has_monitor_capable_interface: caps.issues.append('No monitor mode capable interface') self._capabilities = caps return caps def _detect_interfaces(self) -> list[dict]: """Detect available WiFi interfaces.""" interfaces = [] if platform.system() == 'Darwin': # macOS: Use networksetup try: result = subprocess.run( ['networksetup', '-listallhardwareports'], capture_output=True, text=True, timeout=TOOL_TIMEOUT_DETECT, ) current_port = None for line in result.stdout.splitlines(): if line.startswith('Hardware Port:'): current_port = line.split(':', 1)[1].strip() elif line.startswith('Device:') and current_port: device = line.split(':', 1)[1].strip() if 'Wi-Fi' in current_port or 'wi-fi' in current_port.lower(): interfaces.append({ 'name': device, 'description': current_port, 'supports_monitor': False, # macOS generally doesn't support monitor mode }) current_port = None except Exception as e: logger.debug(f"Error detecting macOS interfaces: {e}") else: # Linux: Use /sys/class/net or iw try: net_path = Path('/sys/class/net') if net_path.exists(): for iface_path in net_path.iterdir(): wireless_path = iface_path / 'wireless' if wireless_path.exists(): iface_name = iface_path.name supports_monitor = self._check_monitor_support(iface_name) interfaces.append({ 'name': iface_name, 'description': f'Wireless interface {iface_name}', 'supports_monitor': supports_monitor, }) except Exception as e: logger.debug(f"Error detecting Linux interfaces: {e}") return interfaces def _check_monitor_support(self, interface: str) -> bool: """Check if interface supports monitor mode.""" try: result = subprocess.run( ['iw', interface, 'info'], capture_output=True, text=True, timeout=TOOL_TIMEOUT_DETECT, ) # Get phy name phy_match = re.search(r'wiphy (\d+)', result.stdout) if phy_match: phy = f"phy{phy_match.group(1)}" # Check supported modes result = subprocess.run( ['iw', phy, 'info'], capture_output=True, text=True, timeout=TOOL_TIMEOUT_DETECT, ) return 'monitor' in result.stdout.lower() except Exception: pass return False # ========================================================================= # Quick Scan # ========================================================================= def quick_scan( self, interface: Optional[str] = None, timeout: float = DEFAULT_QUICK_SCAN_TIMEOUT, ) -> WiFiScanResult: """ Perform a quick one-shot WiFi scan using system tools. Args: interface: Interface to scan on (uses default if None). timeout: Scan timeout in seconds. Returns: WiFiScanResult with discovered networks. """ result = WiFiScanResult(scan_mode=SCAN_MODE_QUICK, started_at=datetime.now()) # Get capabilities if not cached if not self._capabilities: self.check_capabilities() # Determine interface iface = interface or self._interface or self._capabilities.default_interface if not iface: result.error = "No WiFi interface available" result.is_complete = True return result result.interface = iface # Select and run parser based on platform/tools observations = [] tool_used = None try: if self._capabilities.platform == 'darwin': if self._capabilities.has_airport: observations = self._scan_with_airport(iface, timeout) tool_used = 'airport' else: # Linux if self._capabilities.has_nmcli: observations = self._scan_with_nmcli(iface, timeout) tool_used = 'nmcli' elif self._capabilities.has_iw: observations = self._scan_with_iw(iface, timeout) tool_used = 'iw' elif self._capabilities.has_iwlist: observations = self._scan_with_iwlist(iface, timeout) tool_used = 'iwlist' if not tool_used: result.error = "No WiFi scanning tool available" result.is_complete = True return result # Process observations into access points for obs in observations: self._process_observation(obs) # Build result with self._lock: result.access_points = list(self._access_points.values()) # Generate channel stats result.channel_stats = self._calculate_channel_stats() result.recommendations = self._generate_recommendations(result.channel_stats) except subprocess.TimeoutExpired: result.error = f"Scan timed out after {timeout}s" result.warnings.append(f"Tool '{tool_used}' timed out") except Exception as e: result.error = str(e) logger.exception("Quick scan failed") result.completed_at = datetime.now() result.duration_seconds = (result.completed_at - result.started_at).total_seconds() result.is_complete = True return result def _scan_with_airport(self, interface: str, timeout: float) -> list[WiFiObservation]: """Scan using macOS airport utility.""" from .parsers.airport import parse_airport_scan airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport' result = subprocess.run( [airport_path, '-s'], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: logger.warning(f"airport scan failed: {result.stderr}") return [] return parse_airport_scan(result.stdout) def _scan_with_nmcli(self, interface: str, timeout: float) -> list[WiFiObservation]: """Scan using NetworkManager nmcli.""" from .parsers.nmcli import parse_nmcli_scan # Trigger a rescan first subprocess.run( ['nmcli', 'device', 'wifi', 'rescan', 'ifname', interface], capture_output=True, timeout=timeout / 2, ) # Get results result = subprocess.run( ['nmcli', '-t', '-f', 'BSSID,SSID,MODE,CHAN,FREQ,RATE,SIGNAL,SECURITY', 'device', 'wifi', 'list', 'ifname', interface], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: logger.warning(f"nmcli scan failed: {result.stderr}") return [] return parse_nmcli_scan(result.stdout) def _scan_with_iw(self, interface: str, timeout: float) -> list[WiFiObservation]: """Scan using iw.""" from .parsers.iw import parse_iw_scan result = subprocess.run( ['iw', interface, 'scan'], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: # May need root logger.warning(f"iw scan failed: {result.stderr}") return [] return parse_iw_scan(result.stdout) def _scan_with_iwlist(self, interface: str, timeout: float) -> list[WiFiObservation]: """Scan using iwlist.""" from .parsers.iwlist import parse_iwlist_scan result = subprocess.run( ['iwlist', interface, 'scan'], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: logger.warning(f"iwlist scan failed: {result.stderr}") return [] return parse_iwlist_scan(result.stdout) # ========================================================================= # Deep Scan (airodump-ng) # ========================================================================= def start_deep_scan( self, interface: Optional[str] = None, band: str = 'all', channel: Optional[int] = None, ) -> bool: """ Start continuous deep scan with airodump-ng. Requires monitor mode interface and root privileges. Args: interface: Monitor mode interface (e.g., 'wlan0mon'). band: Band to scan ('2.4', '5', 'all'). channel: Specific channel to monitor (None for hopping). Returns: True if scan started successfully. """ with self._lock: if self._status.is_scanning: return True # Get capabilities if not cached if not self._capabilities: self.check_capabilities() if not self._capabilities.can_deep_scan: self._status.error = "Deep scan not available: " + ", ".join(self._capabilities.issues) return False iface = interface or self._capabilities.monitor_interface if not iface: self._status.error = "No monitor mode interface available" return False # Start airodump-ng in background thread self._deep_scan_stop_event.clear() self._deep_scan_thread = threading.Thread( target=self._run_deep_scan, args=(iface, band, channel), daemon=True, ) self._deep_scan_thread.start() self._status = WiFiScanStatus( is_scanning=True, scan_mode=SCAN_MODE_DEEP, interface=iface, started_at=datetime.now(), ) self._queue_event({ 'type': 'scan_started', 'mode': SCAN_MODE_DEEP, 'interface': iface, }) return True def stop_deep_scan(self) -> bool: """ Stop the deep scan. Returns: True if scan was stopped. """ with self._lock: if not self._status.is_scanning: return True self._deep_scan_stop_event.set() if self._deep_scan_process: try: self._deep_scan_process.terminate() self._deep_scan_process.wait(timeout=5) except Exception as e: logger.warning(f"Error terminating airodump-ng: {e}") try: self._deep_scan_process.kill() except Exception: pass self._deep_scan_process = None if self._deep_scan_thread: self._deep_scan_thread.join(timeout=5) self._deep_scan_thread = None self._status.is_scanning = False self._queue_event({ 'type': 'scan_stopped', 'mode': SCAN_MODE_DEEP, }) return True def _run_deep_scan(self, interface: str, band: str, channel: Optional[int]): """Background thread for running airodump-ng.""" from .parsers.airodump import parse_airodump_csv import tempfile # Create temp directory for output files with tempfile.TemporaryDirectory(prefix='wifi_scan_') as tmpdir: output_prefix = os.path.join(tmpdir, 'scan') # Build command cmd = ['airodump-ng', '-w', output_prefix, '--output-format', 'csv'] if channel: cmd.extend(['-c', str(channel)]) elif band == '2.4': cmd.extend(['--band', 'bg']) elif band == '5': cmd.extend(['--band', 'a']) cmd.append(interface) logger.info(f"Starting airodump-ng: {' '.join(cmd)}") try: self._deep_scan_process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) csv_file = f"{output_prefix}-01.csv" # Poll CSV file for updates while not self._deep_scan_stop_event.is_set(): time.sleep(1.0) if os.path.exists(csv_file): try: networks, clients = parse_airodump_csv(csv_file) for obs in networks: self._process_observation(obs) for client_data in clients: self._process_client(client_data) # Update status with self._lock: self._status.networks_found = len(self._access_points) self._status.clients_found = len(self._clients) except Exception as e: logger.debug(f"Error parsing airodump CSV: {e}") except Exception as e: logger.exception(f"Deep scan error: {e}") self._queue_event({ 'type': 'scan_error', 'error': str(e), }) finally: self._deep_scan_process = None # ========================================================================= # Observation Processing # ========================================================================= def _process_observation(self, obs: WiFiObservation): """Process a WiFi observation and update access point data.""" with self._lock: bssid = obs.bssid.upper() if bssid in self._access_points: ap = self._access_points[bssid] self._update_access_point(ap, obs) else: ap = self._create_access_point(obs) self._access_points[bssid] = ap # Check if new (not in baseline) if self._baseline_networks and bssid not in self._baseline_networks: ap.is_new = True # Queue update event self._queue_event({ 'type': 'network_update', 'network': ap.to_summary_dict(), }) # Callback if self._on_network_updated: try: self._on_network_updated(ap) except Exception as e: logger.debug(f"Network callback error: {e}") def _create_access_point(self, obs: WiFiObservation) -> WiFiAccessPoint: """Create new access point from observation.""" now = datetime.now() ap = WiFiAccessPoint( bssid=obs.bssid.upper(), essid=obs.essid, is_hidden=obs.is_hidden, channel=obs.channel, frequency_mhz=obs.frequency_mhz, band=obs.band, width=obs.width, security=obs.security, cipher=obs.cipher, auth=obs.auth, first_seen=now, last_seen=now, seen_count=1, vendor=get_vendor_from_mac(obs.bssid), ) if obs.rssi is not None: ap.rssi_current = obs.rssi ap.rssi_samples = [(now, obs.rssi)] ap.rssi_min = obs.rssi ap.rssi_max = obs.rssi ap.rssi_median = float(obs.rssi) ap.rssi_ema = float(obs.rssi) ap.signal_band = get_signal_band(obs.rssi) ap.proximity_band = get_proximity_band(obs.rssi) ap.beacon_count = obs.beacon_count ap.data_count = obs.data_count return ap def _update_access_point(self, ap: WiFiAccessPoint, obs: WiFiObservation): """Update existing access point with new observation.""" now = datetime.now() ap.last_seen = now ap.seen_count += 1 # Update ESSID if revealed if obs.essid and ap.is_hidden: ap.revealed_essid = obs.essid self._queue_event({ 'type': 'hidden_revealed', 'bssid': ap.bssid, 'revealed_essid': obs.essid, }) # Update RSSI stats if obs.rssi is not None: ap.rssi_current = obs.rssi ap.rssi_samples.append((now, obs.rssi)) # Trim samples if len(ap.rssi_samples) > MAX_RSSI_SAMPLES: ap.rssi_samples = ap.rssi_samples[-MAX_RSSI_SAMPLES:] # Update stats rssi_values = [r for _, r in ap.rssi_samples] ap.rssi_min = min(rssi_values) ap.rssi_max = max(rssi_values) ap.rssi_median = float(sorted(rssi_values)[len(rssi_values) // 2]) # Update EMA if ap.rssi_ema is None: ap.rssi_ema = float(obs.rssi) else: ap.rssi_ema = WIFI_EMA_ALPHA * obs.rssi + (1 - WIFI_EMA_ALPHA) * ap.rssi_ema # Calculate variance if len(rssi_values) >= 2: mean = sum(rssi_values) / len(rssi_values) ap.rssi_variance = sum((r - mean) ** 2 for r in rssi_values) / len(rssi_values) ap.signal_band = get_signal_band(obs.rssi) ap.proximity_band = get_proximity_band(obs.rssi) # Update traffic counters if obs.beacon_count: ap.beacon_count = obs.beacon_count if obs.data_count: ap.data_count = obs.data_count # Calculate seen rate duration = (now - ap.first_seen).total_seconds() if duration > 0: ap.seen_rate = (ap.seen_count / duration) * 60 # per minute def _process_client(self, client_data: dict): """Process client data from airodump-ng.""" mac = client_data.get('mac', '').upper() if not mac or mac == '(not associated)': return with self._lock: if mac in self._clients: client = self._clients[mac] self._update_client(client, client_data) else: client = self._create_client(client_data) self._clients[mac] = client # Queue update event self._queue_event({ 'type': 'client_update', 'client': client.to_dict(), }) # Process probe requests probed = client_data.get('probed_essids', []) for ssid in probed: if ssid and ssid not in client.probed_ssids: client.probed_ssids.append(ssid) client.probe_timestamps[ssid] = datetime.now() probe = WiFiProbeRequest( timestamp=datetime.now(), client_mac=mac, probed_ssid=ssid, rssi=client.rssi_current, client_vendor=client.vendor, ) self._probe_requests.append(probe) self._queue_event({ 'type': 'probe_request', 'probe': probe.to_dict(), }) # Callback if self._on_client_updated: try: self._on_client_updated(client) except Exception as e: logger.debug(f"Client callback error: {e}") def _create_client(self, data: dict) -> WiFiClient: """Create new client from data.""" now = datetime.now() mac = data.get('mac', '').upper() client = WiFiClient( mac=mac, vendor=get_vendor_from_mac(mac), first_seen=now, last_seen=now, seen_count=1, ) rssi = data.get('rssi') if rssi is not None: client.rssi_current = rssi client.rssi_samples = [(now, rssi)] client.rssi_min = rssi client.rssi_max = rssi client.rssi_median = float(rssi) client.rssi_ema = float(rssi) client.signal_band = get_signal_band(rssi) client.proximity_band = get_proximity_band(rssi) bssid = data.get('bssid') if bssid and bssid != '(not associated)': client.associated_bssid = bssid.upper() client.is_associated = True # Update AP client count if client.associated_bssid in self._access_points: self._access_points[client.associated_bssid].client_count += 1 return client def _update_client(self, client: WiFiClient, data: dict): """Update existing client with new data.""" now = datetime.now() client.last_seen = now client.seen_count += 1 rssi = data.get('rssi') if rssi is not None: client.rssi_current = rssi client.rssi_samples.append((now, rssi)) if len(client.rssi_samples) > MAX_RSSI_SAMPLES: client.rssi_samples = client.rssi_samples[-MAX_RSSI_SAMPLES:] rssi_values = [r for _, r in client.rssi_samples] client.rssi_min = min(rssi_values) client.rssi_max = max(rssi_values) client.rssi_median = float(sorted(rssi_values)[len(rssi_values) // 2]) if client.rssi_ema is None: client.rssi_ema = float(rssi) else: client.rssi_ema = WIFI_EMA_ALPHA * rssi + (1 - WIFI_EMA_ALPHA) * client.rssi_ema client.signal_band = get_signal_band(rssi) client.proximity_band = get_proximity_band(rssi) # ========================================================================= # Channel Analysis # ========================================================================= def _calculate_channel_stats(self) -> list[ChannelStats]: """Calculate statistics for each channel.""" from .constants import ( CHANNELS_2_4_GHZ, CHANNELS_5_GHZ, CHANNEL_FREQUENCIES, get_band_from_channel, ) stats_map: dict[int, ChannelStats] = {} with self._lock: for ap in self._access_points.values(): if ap.channel is None: continue if ap.channel not in stats_map: stats_map[ap.channel] = ChannelStats( channel=ap.channel, band=get_band_from_channel(ap.channel), frequency_mhz=CHANNEL_FREQUENCIES.get(ap.channel), ) stats = stats_map[ap.channel] stats.ap_count += 1 stats.client_count += ap.client_count if ap.rssi_current is not None: if stats.rssi_min is None or ap.rssi_current < stats.rssi_min: stats.rssi_min = ap.rssi_current if stats.rssi_max is None or ap.rssi_current > stats.rssi_max: stats.rssi_max = ap.rssi_current # Calculate averages and utilization scores for stats in stats_map.values(): if stats.ap_count > 0: # Simple utilization score based on AP and client density from .constants import CHANNEL_WEIGHT_AP_COUNT, CHANNEL_WEIGHT_CLIENT_COUNT stats.utilization_score = ( (stats.ap_count * CHANNEL_WEIGHT_AP_COUNT) + (stats.client_count * CHANNEL_WEIGHT_CLIENT_COUNT) ) / 10.0 # Normalize stats.utilization_score = min(1.0, stats.utilization_score) return sorted(stats_map.values(), key=lambda s: s.channel) def _generate_recommendations(self, stats: list[ChannelStats]) -> list[ChannelRecommendation]: """Generate channel recommendations.""" from .constants import ( NON_OVERLAPPING_2_4_GHZ, NON_OVERLAPPING_5_GHZ, BAND_2_4_GHZ, BAND_5_GHZ, ) recommendations = [] # Create lookup for existing stats stats_map = {s.channel: s for s in stats} # Score non-overlapping channels for channel in NON_OVERLAPPING_2_4_GHZ: s = stats_map.get(channel) score = s.utilization_score if s else 0.0 recommendations.append(ChannelRecommendation( channel=channel, band=BAND_2_4_GHZ, score=score, reason=f"{s.ap_count if s else 0} APs on channel" if s else "No APs detected", is_dfs=False, )) for channel in NON_OVERLAPPING_5_GHZ: s = stats_map.get(channel) score = s.utilization_score if s else 0.0 is_dfs = 52 <= channel <= 144 recommendations.append(ChannelRecommendation( channel=channel, band=BAND_5_GHZ, score=score, reason=f"{s.ap_count if s else 0} APs on channel" + (" (DFS)" if is_dfs else ""), is_dfs=is_dfs, )) # Sort by score (lower is better) recommendations.sort(key=lambda r: (r.score, r.is_dfs)) # Add rank for i, rec in enumerate(recommendations): rec.recommendation_rank = i + 1 return recommendations # ========================================================================= # Event Streaming # ========================================================================= def _queue_event(self, event: dict): """Add event to the SSE queue.""" try: self._event_queue.put_nowait(event) except queue.Full: # Drop oldest event try: self._event_queue.get_nowait() self._event_queue.put_nowait(event) except Exception: pass def get_event_stream(self) -> Generator[dict, None, None]: """Generate events for SSE streaming.""" while True: try: event = self._event_queue.get(timeout=1.0) yield event except queue.Empty: yield {'type': 'keepalive'} except Exception: break # ========================================================================= # Baseline Management # ========================================================================= def set_baseline(self): """Mark current networks as baseline (known networks).""" with self._lock: self._baseline_networks = set(self._access_points.keys()) self._baseline_set_at = datetime.now() for ap in self._access_points.values(): ap.in_baseline = True ap.is_new = False def clear_baseline(self): """Clear the baseline.""" with self._lock: self._baseline_networks.clear() self._baseline_set_at = None for ap in self._access_points.values(): ap.in_baseline = False # ========================================================================= # Data Access # ========================================================================= def get_network(self, bssid: str) -> Optional[WiFiAccessPoint]: """Get a specific network by BSSID.""" with self._lock: return self._access_points.get(bssid.upper()) def get_client(self, mac: str) -> Optional[WiFiClient]: """Get a specific client by MAC.""" with self._lock: return self._clients.get(mac.upper()) def get_status(self) -> WiFiScanStatus: """Get current scan status.""" with self._lock: self._status.networks_found = len(self._access_points) self._status.clients_found = len(self._clients) return self._status def clear_data(self): """Clear all discovered data.""" with self._lock: self._access_points.clear() self._clients.clear() self._probe_requests.clear() # ========================================================================= # TSCM Compatibility # ========================================================================= def get_networks_legacy_format(self) -> list[dict]: """ Get networks in legacy format for TSCM compatibility. Returns list of dicts with: bssid, essid, power, channel, privacy """ with self._lock: return [ap.to_legacy_dict() for ap in self._access_points.values()] # ============================================================================= # Module-level functions # ============================================================================= def get_wifi_scanner(interface: Optional[str] = None) -> UnifiedWiFiScanner: """ Get or create the global WiFi scanner instance. Args: interface: WiFi interface name. Returns: UnifiedWiFiScanner instance. """ global _scanner_instance with _scanner_lock: if _scanner_instance is None: _scanner_instance = UnifiedWiFiScanner(interface) return _scanner_instance def reset_wifi_scanner(): """Reset the global scanner instance.""" global _scanner_instance with _scanner_lock: if _scanner_instance: _scanner_instance.stop_deep_scan() _scanner_instance.clear_data() _scanner_instance = None