Files
intercept/utils/wifi/channel_analyzer.py
Smittix e00fbfddc1 v2.26.0: fix SSE fanout crash and branded logo FOUC
- Fix SSE fanout thread AttributeError when source queue is None during
  interpreter shutdown by snapshotting to local variable with null guard
- Fix branded "i" logo rendering oversized on first page load (FOUC) by
  adding inline width/height to SVG elements across 10 templates
- Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:51:27 +00:00

290 lines
9.5 KiB
Python

"""
WiFi channel utilization analysis and recommendations.
Analyzes channel congestion based on:
- Number of access points per channel
- Number of clients per channel
- Signal strength (stronger = more interference)
- Channel overlap effects
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from .constants import (
BAND_2_4_GHZ,
BAND_5_GHZ,
CHANNEL_FREQUENCIES,
CHANNEL_RSSI_INTERFERENCE_FACTOR,
CHANNEL_WEIGHT_AP_COUNT,
CHANNEL_WEIGHT_CLIENT_COUNT,
NON_OVERLAPPING_2_4_GHZ,
NON_OVERLAPPING_5_GHZ,
get_band_from_channel,
)
from .models import ChannelRecommendation, ChannelStats, WiFiAccessPoint
logger = logging.getLogger(__name__)
# DFS channels (Dynamic Frequency Selection) - require radar detection
DFS_CHANNELS_5_GHZ = list(range(52, 65)) + list(range(100, 145))
@dataclass
class ChannelScore:
"""Internal scoring for a channel."""
channel: int
band: str
ap_count: int = 0
client_count: int = 0
rssi_sum: float = 0.0
rssi_count: int = 0
overlap_penalty: float = 0.0
class ChannelAnalyzer:
"""
Analyzes WiFi channel utilization and provides recommendations.
Uses a scoring algorithm that considers:
1. AP density (60% weight by default)
2. Client density (40% weight by default)
3. Signal strength adjustment (stronger signals = more interference)
4. Channel overlap effects for 2.4 GHz
"""
def __init__(
self,
ap_weight: float = CHANNEL_WEIGHT_AP_COUNT,
client_weight: float = CHANNEL_WEIGHT_CLIENT_COUNT,
rssi_factor: float = CHANNEL_RSSI_INTERFERENCE_FACTOR,
):
"""
Initialize channel analyzer.
Args:
ap_weight: Weight for AP count in utilization score (0-1).
client_weight: Weight for client count in utilization score (0-1).
rssi_factor: Factor for RSSI-based interference adjustment.
"""
self.ap_weight = ap_weight
self.client_weight = client_weight
self.rssi_factor = rssi_factor
def analyze(
self,
access_points: list[WiFiAccessPoint],
include_dfs: bool = False,
) -> tuple[list[ChannelStats], list[ChannelRecommendation]]:
"""
Analyze channel utilization from access point data.
Args:
access_points: List of discovered access points.
include_dfs: Whether to include DFS channels in recommendations.
Returns:
Tuple of (channel_stats, recommendations).
"""
# Build per-channel scores
scores: dict[int, ChannelScore] = {}
for ap in access_points:
if ap.channel is None:
continue
channel = ap.channel
if channel not in scores:
scores[channel] = ChannelScore(
channel=channel,
band=get_band_from_channel(channel),
)
score = scores[channel]
score.ap_count += 1
score.client_count += ap.client_count
if ap.rssi_current is not None:
score.rssi_sum += ap.rssi_current
score.rssi_count += 1
# Calculate overlap penalties for 2.4 GHz
self._calculate_overlap_penalties(scores)
# Convert to ChannelStats
channel_stats = self._build_channel_stats(scores)
# Generate recommendations
recommendations = self._generate_recommendations(
scores, access_points, include_dfs
)
return channel_stats, recommendations
def _calculate_overlap_penalties(self, scores: dict[int, ChannelScore]):
"""Calculate overlap penalties for 2.4 GHz channels."""
# In 2.4 GHz, channels overlap: each channel is 22 MHz wide
# but only 5 MHz apart. Channels 1, 6, 11 don't overlap.
#
# Channel overlap:
# - Adjacent channel (+/- 1): 75% overlap
# - 2 channels apart: 50% overlap
# - 3 channels apart: 25% overlap
# - 4 channels apart: ~12% overlap
# - 5+ channels apart: no overlap
overlap_factors = {1: 0.75, 2: 0.50, 3: 0.25, 4: 0.12}
for channel, score in scores.items():
if score.band != BAND_2_4_GHZ:
continue
penalty = 0.0
for other_channel, other_score in scores.items():
if other_channel == channel or other_score.band != BAND_2_4_GHZ:
continue
distance = abs(channel - other_channel)
if distance in overlap_factors:
# Penalty based on APs on overlapping channel
overlap = overlap_factors[distance]
penalty += other_score.ap_count * overlap * 0.5
score.overlap_penalty = penalty
def _build_channel_stats(self, scores: dict[int, ChannelScore]) -> list[ChannelStats]:
"""Build ChannelStats from scores."""
stats_list = []
for channel, score in sorted(scores.items()):
rssi_avg = None
if score.rssi_count > 0:
rssi_avg = score.rssi_sum / score.rssi_count
# Calculate utilization score
utilization = self._calculate_utilization(score)
stats = ChannelStats(
channel=channel,
band=score.band,
frequency_mhz=CHANNEL_FREQUENCIES.get(channel),
ap_count=score.ap_count,
client_count=score.client_count,
rssi_avg=rssi_avg,
utilization_score=utilization,
)
stats_list.append(stats)
return stats_list
def _calculate_utilization(self, score: ChannelScore) -> float:
"""Calculate channel utilization score (0.0-1.0, lower is better)."""
# Base score from AP and client counts
ap_score = score.ap_count * self.ap_weight
client_score = score.client_count * self.client_weight
# RSSI adjustment: stronger signals = more interference
rssi_adjustment = 0.0
if score.rssi_count > 0:
avg_rssi = score.rssi_sum / score.rssi_count
# Normalize: -30 dBm (very strong) -> 1.0, -100 dBm (weak) -> 0.0
rssi_normalized = (avg_rssi + 100) / 70
rssi_adjustment = max(0, rssi_normalized) * self.rssi_factor * score.ap_count
# Overlap penalty (already scaled)
overlap_score = score.overlap_penalty
# Total score
total = ap_score + client_score + rssi_adjustment + overlap_score
# Normalize to 0.0-1.0 range (cap at reasonable maximum)
normalized = min(1.0, total / 10.0)
return normalized
def _generate_recommendations(
self,
scores: dict[int, ChannelScore],
access_points: list[WiFiAccessPoint],
include_dfs: bool,
) -> list[ChannelRecommendation]:
"""Generate channel recommendations."""
recommendations = []
# Score all non-overlapping channels
candidate_channels = []
# 2.4 GHz non-overlapping
for channel in NON_OVERLAPPING_2_4_GHZ:
candidate_channels.append((channel, BAND_2_4_GHZ, False))
# 5 GHz non-DFS
for channel in NON_OVERLAPPING_5_GHZ:
is_dfs = channel in DFS_CHANNELS_5_GHZ
if is_dfs and not include_dfs:
continue
candidate_channels.append((channel, BAND_5_GHZ, is_dfs))
# 5 GHz DFS channels (if requested)
if include_dfs:
for channel in DFS_CHANNELS_5_GHZ:
if channel not in NON_OVERLAPPING_5_GHZ:
candidate_channels.append((channel, BAND_5_GHZ, True))
# Score each candidate
for channel, band, is_dfs in candidate_channels:
score = scores.get(channel)
if score:
utilization = self._calculate_utilization(score)
ap_count = score.ap_count
else:
utilization = 0.0
ap_count = 0
# Build reason string
if ap_count == 0:
reason = "No APs detected - clear channel"
elif ap_count == 1:
reason = "1 AP on channel"
else:
reason = f"{ap_count} APs on channel"
if is_dfs:
reason += " (DFS - radar detection required)"
recommendations.append(ChannelRecommendation(
channel=channel,
band=band,
score=utilization,
reason=reason,
is_dfs=is_dfs,
))
# Sort by score (lower is better), then prefer non-DFS
recommendations.sort(key=lambda r: (r.score, r.is_dfs, r.channel))
return recommendations
# Module-level convenience function
def analyze_channels(
access_points: list[WiFiAccessPoint],
include_dfs: bool = False,
) -> tuple[list[ChannelStats], list[ChannelRecommendation]]:
"""
Analyze channel utilization and get recommendations.
Args:
access_points: List of discovered access points.
include_dfs: Whether to include DFS channels.
Returns:
Tuple of (channel_stats, recommendations).
"""
analyzer = ChannelAnalyzer()
return analyzer.analyze(access_points, include_dfs)