Fix 5GHz WiFi scanning failures in deep scan and band detection

- Fix deep scan with 'All bands' never scanning 5GHz: band='all' now
  correctly passes --band abg to airodump-ng (previously no flag was
  added, causing airodump-ng to default to 2.4GHz-only)
- Fix APs first seen without channel info permanently stuck at
  band='unknown': _update_access_point now backfills channel, frequency,
  and band when a subsequent observation resolves the channel
- Fix legacy /wifi/scan/start combining mutually exclusive --band and -c
  flags: --band is now only added when no explicit channel list is given,
  and the interface is always placed as the last argument

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-03-20 21:50:47 +00:00
parent b20b9838d0
commit 89c7c2fb07
2 changed files with 151 additions and 137 deletions

View File

@@ -34,6 +34,7 @@ from .constants import (
SCAN_MODE_QUICK,
TOOL_TIMEOUT_DETECT,
WIFI_EMA_ALPHA,
get_band_from_channel,
get_proximity_band,
get_signal_band,
get_vendor_from_mac,
@@ -661,13 +662,13 @@ class UnifiedWiFiScanner:
# Deep Scan (airodump-ng)
# =========================================================================
def start_deep_scan(
self,
interface: str | None = None,
band: str = 'all',
channel: int | None = None,
channels: list[int] | None = None,
) -> bool:
def start_deep_scan(
self,
interface: str | None = None,
band: str = 'all',
channel: int | None = None,
channels: list[int] | None = None,
) -> bool:
"""
Start continuous deep scan with airodump-ng.
@@ -700,11 +701,11 @@ class UnifiedWiFiScanner:
# Start airodump-ng in background thread
self._deep_scan_stop_event.clear()
self._deep_scan_thread = threading.Thread(
target=self._run_deep_scan,
args=(iface, band, channel, channels),
daemon=True,
)
self._deep_scan_thread = threading.Thread(
target=self._run_deep_scan,
args=(iface, band, channel, channels),
daemon=True,
)
self._deep_scan_thread.start()
self._status = WiFiScanStatus(
@@ -725,83 +726,83 @@ class UnifiedWiFiScanner:
return True
def stop_deep_scan(self) -> bool:
"""
Stop the deep scan.
Returns:
True if scan was stopped.
"""
cleanup_process: subprocess.Popen | None = None
cleanup_thread: threading.Thread | None = None
cleanup_detector = None
with self._lock:
if not self._status.is_scanning:
return True
self._deep_scan_stop_event.set()
cleanup_process = self._deep_scan_process
cleanup_thread = self._deep_scan_thread
cleanup_detector = self._deauth_detector
self._deauth_detector = None
self._deep_scan_process = None
self._deep_scan_thread = None
self._status.is_scanning = False
self._status.error = None
self._queue_event({
'type': 'scan_stopped',
'mode': SCAN_MODE_DEEP,
})
cleanup_start = time.perf_counter()
def _finalize_stop(
process: subprocess.Popen | None,
scan_thread: threading.Thread | None,
detector,
) -> None:
if detector:
try:
detector.stop()
logger.info("Deauth detector stopped")
self._queue_event({'type': 'deauth_detector_stopped'})
except Exception as exc:
logger.error(f"Error stopping deauth detector: {exc}")
if process and process.poll() is None:
try:
process.terminate()
process.wait(timeout=1.5)
except Exception:
with contextlib.suppress(Exception):
process.kill()
if scan_thread and scan_thread.is_alive():
scan_thread.join(timeout=1.5)
elapsed_ms = (time.perf_counter() - cleanup_start) * 1000.0
logger.info(f"Deep scan stop finalized in {elapsed_ms:.1f}ms")
threading.Thread(
target=_finalize_stop,
args=(cleanup_process, cleanup_thread, cleanup_detector),
daemon=True,
name='wifi-deep-stop',
).start()
return True
def stop_deep_scan(self) -> bool:
"""
Stop the deep scan.
def _run_deep_scan(
self,
interface: str,
band: str,
channel: int | None,
channels: list[int] | None,
):
"""Background thread for running airodump-ng."""
Returns:
True if scan was stopped.
"""
cleanup_process: subprocess.Popen | None = None
cleanup_thread: threading.Thread | None = None
cleanup_detector = None
with self._lock:
if not self._status.is_scanning:
return True
self._deep_scan_stop_event.set()
cleanup_process = self._deep_scan_process
cleanup_thread = self._deep_scan_thread
cleanup_detector = self._deauth_detector
self._deauth_detector = None
self._deep_scan_process = None
self._deep_scan_thread = None
self._status.is_scanning = False
self._status.error = None
self._queue_event({
'type': 'scan_stopped',
'mode': SCAN_MODE_DEEP,
})
cleanup_start = time.perf_counter()
def _finalize_stop(
process: subprocess.Popen | None,
scan_thread: threading.Thread | None,
detector,
) -> None:
if detector:
try:
detector.stop()
logger.info("Deauth detector stopped")
self._queue_event({'type': 'deauth_detector_stopped'})
except Exception as exc:
logger.error(f"Error stopping deauth detector: {exc}")
if process and process.poll() is None:
try:
process.terminate()
process.wait(timeout=1.5)
except Exception:
with contextlib.suppress(Exception):
process.kill()
if scan_thread and scan_thread.is_alive():
scan_thread.join(timeout=1.5)
elapsed_ms = (time.perf_counter() - cleanup_start) * 1000.0
logger.info(f"Deep scan stop finalized in {elapsed_ms:.1f}ms")
threading.Thread(
target=_finalize_stop,
args=(cleanup_process, cleanup_thread, cleanup_detector),
daemon=True,
name='wifi-deep-stop',
).start()
return True
def _run_deep_scan(
self,
interface: str,
band: str,
channel: int | None,
channels: list[int] | None,
):
"""Background thread for running airodump-ng."""
import tempfile
from .parsers.airodump import parse_airodump_csv
@@ -813,43 +814,45 @@ class UnifiedWiFiScanner:
# Build command
cmd = ['airodump-ng', '-w', output_prefix, '--output-format', 'csv']
if channels:
cmd.extend(['-c', ','.join(str(c) for c in channels)])
elif channel:
cmd.extend(['-c', str(channel)])
elif band == '2.4':
cmd.extend(['--band', 'bg'])
elif band == '5':
cmd.extend(['--band', 'a'])
if channels:
cmd.extend(['-c', ','.join(str(c) for c in channels)])
elif channel:
cmd.extend(['-c', str(channel)])
elif band == '2.4':
cmd.extend(['--band', 'bg'])
elif band == '5':
cmd.extend(['--band', 'a'])
else:
cmd.extend(['--band', 'abg'])
cmd.append(interface)
logger.info(f"Starting airodump-ng: {' '.join(cmd)}")
process: subprocess.Popen | None = None
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
should_track_process = False
with self._lock:
# Only expose the process handle if this run has not been
# replaced by a newer deep scan session.
if self._status.is_scanning and not self._deep_scan_stop_event.is_set():
should_track_process = True
self._deep_scan_process = process
if not should_track_process:
try:
process.terminate()
process.wait(timeout=1.0)
except Exception:
with contextlib.suppress(Exception):
process.kill()
return
csv_file = f"{output_prefix}-01.csv"
process: subprocess.Popen | None = None
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
should_track_process = False
with self._lock:
# Only expose the process handle if this run has not been
# replaced by a newer deep scan session.
if self._status.is_scanning and not self._deep_scan_stop_event.is_set():
should_track_process = True
self._deep_scan_process = process
if not should_track_process:
try:
process.terminate()
process.wait(timeout=1.0)
except Exception:
with contextlib.suppress(Exception):
process.kill()
return
csv_file = f"{output_prefix}-01.csv"
# Poll CSV file for updates
while not self._deep_scan_stop_event.is_set():
@@ -873,16 +876,16 @@ class UnifiedWiFiScanner:
except Exception as e:
logger.debug(f"Error parsing airodump CSV: {e}")
except Exception as e:
logger.exception(f"Deep scan error: {e}")
self._queue_event({
'type': 'scan_error',
'error': str(e),
})
finally:
with self._lock:
if process is not None and self._deep_scan_process is process:
self._deep_scan_process = None
except Exception as e:
logger.exception(f"Deep scan error: {e}")
self._queue_event({
'type': 'scan_error',
'error': str(e),
})
finally:
with self._lock:
if process is not None and self._deep_scan_process is process:
self._deep_scan_process = None
# =========================================================================
# Observation Processing
@@ -958,6 +961,12 @@ class UnifiedWiFiScanner:
ap.last_seen = now
ap.seen_count += 1
# Update channel/band if now known (airodump-ng may report -1 or 0 before resolving)
if obs.channel and not ap.channel:
ap.channel = obs.channel
ap.frequency_mhz = obs.frequency_mhz
ap.band = get_band_from_channel(obs.channel)
# Update ESSID if revealed
if obs.essid and ap.is_hidden:
ap.revealed_essid = obs.essid