mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 14:50:00 -07:00
Refactor into modular structure with improvements
- Split monolithic intercept.py (15k lines) into modular structure: - routes/ - Flask blueprints for each feature - templates/ - Jinja2 HTML templates - data/ - OUI database, satellite TLEs, detection patterns - utils/ - dependencies, process management, logging - config.py - centralized configuration with env var support - Add type hints to function signatures - Replace bare except clauses with specific exceptions - Add proper logging module (replaces print statements) - Add environment variable support (INTERCEPT_* prefix) - Add test suite with pytest - Add Dockerfile for containerized deployment - Add pyproject.toml with ruff/black/mypy config - Add requirements-dev.txt for development dependencies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
80
utils/process.py
Normal file
80
utils/process.py
Normal file
@@ -0,0 +1,80 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from .dependencies import check_tool
|
||||
|
||||
|
||||
def cleanup_stale_processes() -> None:
|
||||
"""Kill any stale processes from previous runs (but not system services)."""
|
||||
# Note: dump1090 is NOT included here as users may run it as a system service
|
||||
processes_to_kill = ['rtl_adsb', 'rtl_433', 'multimon-ng', 'rtl_fm']
|
||||
for proc_name in processes_to_kill:
|
||||
try:
|
||||
subprocess.run(['pkill', '-9', proc_name], capture_output=True)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
|
||||
|
||||
def is_valid_mac(mac: str | None) -> bool:
|
||||
"""Validate MAC address format."""
|
||||
if not mac:
|
||||
return False
|
||||
return bool(re.match(r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$', mac))
|
||||
|
||||
|
||||
def is_valid_channel(channel: str | int | None) -> bool:
|
||||
"""Validate WiFi channel number."""
|
||||
try:
|
||||
ch = int(channel) # type: ignore[arg-type]
|
||||
return 1 <= ch <= 200
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
def detect_devices() -> list[dict[str, Any]]:
|
||||
"""Detect RTL-SDR devices."""
|
||||
devices: list[dict[str, Any]] = []
|
||||
|
||||
if not check_tool('rtl_test'):
|
||||
return devices
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['rtl_test', '-t'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
output = result.stderr + result.stdout
|
||||
|
||||
# Parse device info
|
||||
device_pattern = r'(\d+):\s+(.+?)(?:,\s*SN:\s*(\S+))?$'
|
||||
|
||||
for line in output.split('\n'):
|
||||
line = line.strip()
|
||||
match = re.match(device_pattern, line)
|
||||
if match:
|
||||
devices.append({
|
||||
'index': int(match.group(1)),
|
||||
'name': match.group(2).strip().rstrip(','),
|
||||
'serial': match.group(3) or 'N/A'
|
||||
})
|
||||
|
||||
if not devices:
|
||||
found_match = re.search(r'Found (\d+) device', output)
|
||||
if found_match:
|
||||
count = int(found_match.group(1))
|
||||
for i in range(count):
|
||||
devices.append({
|
||||
'index': i,
|
||||
'name': f'RTL-SDR Device {i}',
|
||||
'serial': 'Unknown'
|
||||
})
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return devices
|
||||
Reference in New Issue
Block a user