mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
- Fix SSE fanout thread AttributeError when source queue is None during interpreter shutdown by snapshotting to local variable with null guard - Fix branded "i" logo rendering oversized on first page load (FOUC) by adding inline width/height to SVG elements across 10 templates - Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
241 lines
7.9 KiB
Python
241 lines
7.9 KiB
Python
"""Settings management routes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
from flask import Blueprint, Response, jsonify, request
|
|
|
|
from utils.database import (
|
|
delete_setting,
|
|
get_all_settings,
|
|
get_correlations,
|
|
get_setting,
|
|
set_setting,
|
|
)
|
|
from utils.logging import get_logger
|
|
from utils.responses import api_error, api_success
|
|
|
|
logger = get_logger('intercept.settings')
|
|
|
|
settings_bp = Blueprint('settings', __name__, url_prefix='/settings')
|
|
|
|
|
|
@settings_bp.route('', methods=['GET'])
|
|
def get_settings() -> Response:
|
|
"""Get all settings."""
|
|
try:
|
|
settings = get_all_settings()
|
|
return api_success(data={'settings': settings})
|
|
except Exception as e:
|
|
logger.error(f"Error getting settings: {e}")
|
|
return api_error(str(e), 500)
|
|
|
|
|
|
@settings_bp.route('', methods=['POST'])
|
|
def save_settings() -> Response:
|
|
"""Save one or more settings."""
|
|
data = request.json or {}
|
|
|
|
if not data:
|
|
return api_error('No settings provided', 400)
|
|
|
|
try:
|
|
saved = []
|
|
for key, value in data.items():
|
|
# Validate key (alphanumeric, underscores, dots, hyphens)
|
|
if not key or not all(c.isalnum() or c in '_.-' for c in key):
|
|
continue
|
|
|
|
set_setting(key, value)
|
|
saved.append(key)
|
|
|
|
return api_success(data={'saved': saved})
|
|
except Exception as e:
|
|
logger.error(f"Error saving settings: {e}")
|
|
return api_error(str(e), 500)
|
|
|
|
|
|
@settings_bp.route('/<key>', methods=['GET'])
|
|
def get_single_setting(key: str) -> Response:
|
|
"""Get a single setting by key."""
|
|
try:
|
|
value = get_setting(key)
|
|
if value is None:
|
|
return jsonify({
|
|
'status': 'not_found',
|
|
'key': key
|
|
}), 404
|
|
|
|
return api_success(data={'key': key, 'value': value})
|
|
except Exception as e:
|
|
logger.error(f"Error getting setting {key}: {e}")
|
|
return api_error(str(e), 500)
|
|
|
|
|
|
@settings_bp.route('/<key>', methods=['PUT'])
|
|
def update_single_setting(key: str) -> Response:
|
|
"""Update a single setting."""
|
|
data = request.json or {}
|
|
value = data.get('value')
|
|
|
|
if value is None and 'value' not in data:
|
|
return api_error('Value is required', 400)
|
|
|
|
try:
|
|
set_setting(key, value)
|
|
return api_success(data={'key': key, 'value': value})
|
|
except Exception as e:
|
|
logger.error(f"Error updating setting {key}: {e}")
|
|
return api_error(str(e), 500)
|
|
|
|
|
|
@settings_bp.route('/<key>', methods=['DELETE'])
|
|
def delete_single_setting(key: str) -> Response:
|
|
"""Delete a setting."""
|
|
try:
|
|
deleted = delete_setting(key)
|
|
if deleted:
|
|
return api_success(data={'key': key, 'deleted': True})
|
|
else:
|
|
return jsonify({
|
|
'status': 'not_found',
|
|
'key': key
|
|
}), 404
|
|
except Exception as e:
|
|
logger.error(f"Error deleting setting {key}: {e}")
|
|
return api_error(str(e), 500)
|
|
|
|
|
|
# =============================================================================
|
|
# Device Correlation Endpoints
|
|
# =============================================================================
|
|
|
|
@settings_bp.route('/correlations', methods=['GET'])
|
|
def get_device_correlations() -> Response:
|
|
"""Get device correlations between WiFi and Bluetooth."""
|
|
min_confidence = request.args.get('min_confidence', 0.5, type=float)
|
|
|
|
try:
|
|
correlations = get_correlations(min_confidence)
|
|
return api_success(data={'correlations': correlations})
|
|
except Exception as e:
|
|
logger.error(f"Error getting correlations: {e}")
|
|
return api_error(str(e), 500)
|
|
|
|
|
|
# =============================================================================
|
|
# RTL-SDR DVB Driver Management
|
|
# =============================================================================
|
|
|
|
DVB_MODULES = ['dvb_usb_rtl28xxu', 'rtl2832_sdr', 'rtl2832', 'rtl2830', 'r820t']
|
|
BLACKLIST_FILE = '/etc/modprobe.d/blacklist-rtlsdr.conf'
|
|
|
|
|
|
@settings_bp.route('/rtlsdr/driver-status', methods=['GET'])
|
|
def check_dvb_driver_status() -> Response:
|
|
"""Check if DVB kernel drivers are loaded and blocking RTL-SDR devices."""
|
|
if sys.platform != 'linux':
|
|
return jsonify({
|
|
'status': 'success',
|
|
'platform': sys.platform,
|
|
'issue_detected': False,
|
|
'message': 'DVB driver conflict only affects Linux systems'
|
|
})
|
|
|
|
# Check which DVB modules are currently loaded
|
|
loaded_modules = []
|
|
try:
|
|
result = subprocess.run(['lsmod'], capture_output=True, text=True, timeout=5)
|
|
lsmod_output = result.stdout
|
|
for mod in DVB_MODULES:
|
|
if mod in lsmod_output:
|
|
loaded_modules.append(mod)
|
|
except Exception as e:
|
|
logger.warning(f"Could not check loaded modules: {e}")
|
|
|
|
# Check if blacklist file exists
|
|
blacklist_exists = os.path.exists(BLACKLIST_FILE)
|
|
|
|
# Check blacklist file contents
|
|
blacklist_contents = []
|
|
if blacklist_exists:
|
|
try:
|
|
with open(BLACKLIST_FILE) as f:
|
|
blacklist_contents = [line.strip() for line in f if line.strip() and not line.startswith('#')]
|
|
except Exception:
|
|
pass
|
|
|
|
issue_detected = len(loaded_modules) > 0
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'platform': 'linux',
|
|
'issue_detected': issue_detected,
|
|
'loaded_modules': loaded_modules,
|
|
'blacklist_file_exists': blacklist_exists,
|
|
'blacklist_contents': blacklist_contents,
|
|
'message': 'DVB drivers are claiming RTL-SDR devices' if issue_detected else 'No DVB driver conflict detected'
|
|
})
|
|
|
|
|
|
@settings_bp.route('/rtlsdr/blacklist-drivers', methods=['POST'])
|
|
def blacklist_dvb_drivers() -> Response:
|
|
"""Blacklist DVB kernel drivers to prevent them from claiming RTL-SDR devices."""
|
|
if sys.platform != 'linux':
|
|
return api_error('This feature is only available on Linux', 400)
|
|
|
|
# Check if we have permission (need to be running as root or with sudo)
|
|
if os.geteuid() != 0:
|
|
return api_error('Root privileges required. Run the app with sudo or manually run: sudo modprobe -r dvb_usb_rtl28xxu rtl2832_sdr rtl2832 r820t', 403)
|
|
|
|
errors = []
|
|
successes = []
|
|
|
|
# Create blacklist file if it doesn't exist
|
|
if not os.path.exists(BLACKLIST_FILE):
|
|
try:
|
|
blacklist_content = """# RTL-SDR blacklist - prevents DVB drivers from claiming RTL-SDR devices
|
|
# Created by INTERCEPT
|
|
blacklist dvb_usb_rtl28xxu
|
|
blacklist rtl2832
|
|
blacklist rtl2830
|
|
blacklist r820t
|
|
"""
|
|
with open(BLACKLIST_FILE, 'w') as f:
|
|
f.write(blacklist_content)
|
|
successes.append(f'Created {BLACKLIST_FILE}')
|
|
except Exception as e:
|
|
errors.append(f'Failed to create blacklist file: {e}')
|
|
|
|
# Unload the modules
|
|
for mod in DVB_MODULES:
|
|
try:
|
|
result = subprocess.run(
|
|
['modprobe', '-r', mod],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
successes.append(f'Unloaded module: {mod}')
|
|
# returncode != 0 is OK - module might not be loaded
|
|
except Exception as e:
|
|
logger.warning(f"Could not unload {mod}: {e}")
|
|
|
|
if errors:
|
|
return jsonify({
|
|
'status': 'partial',
|
|
'message': 'Some operations failed. Please unplug and replug your RTL-SDR device.',
|
|
'successes': successes,
|
|
'errors': errors
|
|
})
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': 'DVB drivers blacklisted. Please unplug and replug your RTL-SDR device.',
|
|
'successes': successes
|
|
})
|