mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 22:59:59 -07:00
Security improvements: - Add interface name validation to prevent command injection - Fix XSS vulnerability in pager message display - Add security headers (X-Content-Type-Options, X-Frame-Options, etc.) - Disable Werkzeug debug PIN - Add security documentation Features: - Add bias-t power support for SDR dongles across all modes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
259 lines
8.5 KiB
Python
259 lines
8.5 KiB
Python
"""Input validation utilities for API endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
def escape_html(text: str | None) -> str:
|
|
"""Escape HTML special characters to prevent XSS attacks."""
|
|
if text is None:
|
|
return ''
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
html_escape_table = {
|
|
'&': '&',
|
|
'<': '<',
|
|
'>': '>',
|
|
'"': '"',
|
|
"'": ''',
|
|
}
|
|
return ''.join(html_escape_table.get(c, c) for c in text)
|
|
|
|
|
|
def validate_latitude(lat: Any) -> float:
|
|
"""Validate and return latitude value."""
|
|
try:
|
|
lat_float = float(lat)
|
|
if not -90 <= lat_float <= 90:
|
|
raise ValueError(f"Latitude must be between -90 and 90, got {lat_float}")
|
|
return lat_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid latitude: {lat}") from e
|
|
|
|
|
|
def validate_longitude(lon: Any) -> float:
|
|
"""Validate and return longitude value."""
|
|
try:
|
|
lon_float = float(lon)
|
|
if not -180 <= lon_float <= 180:
|
|
raise ValueError(f"Longitude must be between -180 and 180, got {lon_float}")
|
|
return lon_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid longitude: {lon}") from e
|
|
|
|
|
|
def validate_frequency(freq: Any, min_mhz: float = 24.0, max_mhz: float = 1766.0) -> float:
|
|
"""Validate and return frequency in MHz."""
|
|
try:
|
|
freq_float = float(freq)
|
|
if not min_mhz <= freq_float <= max_mhz:
|
|
raise ValueError(f"Frequency must be between {min_mhz} and {max_mhz} MHz, got {freq_float}")
|
|
return freq_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid frequency: {freq}") from e
|
|
|
|
|
|
def validate_device_index(device: Any) -> int:
|
|
"""Validate and return RTL-SDR device index."""
|
|
try:
|
|
device_int = int(device)
|
|
if not 0 <= device_int <= 255:
|
|
raise ValueError(f"Device index must be between 0 and 255, got {device_int}")
|
|
return device_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid device index: {device}") from e
|
|
|
|
|
|
def validate_rtl_tcp_host(host: Any) -> str:
|
|
"""Validate and return rtl_tcp server hostname or IP address."""
|
|
if not host or not isinstance(host, str):
|
|
raise ValueError("rtl_tcp host is required")
|
|
host = host.strip()
|
|
if not host:
|
|
raise ValueError("rtl_tcp host cannot be empty")
|
|
# Allow alphanumeric, dots, hyphens (valid for hostnames and IPs)
|
|
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.\-]*$', host):
|
|
raise ValueError(f"Invalid rtl_tcp host: {host}")
|
|
if len(host) > 253:
|
|
raise ValueError("rtl_tcp host too long")
|
|
return host
|
|
|
|
|
|
def validate_rtl_tcp_port(port: Any) -> int:
|
|
"""Validate and return rtl_tcp server port."""
|
|
try:
|
|
port_int = int(port)
|
|
if not 1 <= port_int <= 65535:
|
|
raise ValueError(f"Port must be between 1 and 65535, got {port_int}")
|
|
return port_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid rtl_tcp port: {port}") from e
|
|
|
|
|
|
def validate_gain(gain: Any) -> float:
|
|
"""Validate and return gain value."""
|
|
try:
|
|
gain_float = float(gain)
|
|
if not 0 <= gain_float <= 50:
|
|
raise ValueError(f"Gain must be between 0 and 50, got {gain_float}")
|
|
return gain_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid gain: {gain}") from e
|
|
|
|
|
|
def validate_ppm(ppm: Any) -> int:
|
|
"""Validate and return PPM correction value."""
|
|
try:
|
|
ppm_int = int(ppm)
|
|
if not -1000 <= ppm_int <= 1000:
|
|
raise ValueError(f"PPM must be between -1000 and 1000, got {ppm_int}")
|
|
return ppm_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid PPM: {ppm}") from e
|
|
|
|
|
|
def validate_hours(hours: Any, min_hours: int = 1, max_hours: int = 168) -> int:
|
|
"""Validate and return hours value (for satellite predictions)."""
|
|
try:
|
|
hours_int = int(hours)
|
|
if not min_hours <= hours_int <= max_hours:
|
|
raise ValueError(f"Hours must be between {min_hours} and {max_hours}, got {hours_int}")
|
|
return hours_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid hours: {hours}") from e
|
|
|
|
|
|
def validate_elevation(elevation: Any) -> float:
|
|
"""Validate and return elevation angle."""
|
|
try:
|
|
el_float = float(elevation)
|
|
if not 0 <= el_float <= 90:
|
|
raise ValueError(f"Elevation must be between 0 and 90, got {el_float}")
|
|
return el_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid elevation: {elevation}") from e
|
|
|
|
|
|
def validate_wifi_channel(channel: Any) -> int:
|
|
"""Validate and return WiFi channel."""
|
|
try:
|
|
ch_int = int(channel)
|
|
# Valid WiFi channels: 1-14 (2.4GHz), 32-177 (5GHz)
|
|
valid_2ghz = 1 <= ch_int <= 14
|
|
valid_5ghz = 32 <= ch_int <= 177
|
|
if not (valid_2ghz or valid_5ghz):
|
|
raise ValueError(f"Invalid WiFi channel: {ch_int}")
|
|
return ch_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid WiFi channel: {channel}") from e
|
|
|
|
|
|
def validate_mac_address(mac: Any) -> str:
|
|
"""Validate and return MAC address."""
|
|
if not mac or not isinstance(mac, str):
|
|
raise ValueError("MAC address is required")
|
|
mac = mac.upper().strip()
|
|
if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', mac):
|
|
raise ValueError(f"Invalid MAC address format: {mac}")
|
|
return mac
|
|
|
|
|
|
def validate_positive_int(value: Any, name: str = 'value', max_val: int | None = None) -> int:
|
|
"""Validate and return a positive integer."""
|
|
try:
|
|
val_int = int(value)
|
|
if val_int < 0:
|
|
raise ValueError(f"{name} must be positive, got {val_int}")
|
|
if max_val is not None and val_int > max_val:
|
|
raise ValueError(f"{name} must be <= {max_val}, got {val_int}")
|
|
return val_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid {name}: {value}") from e
|
|
|
|
|
|
def sanitize_callsign(callsign: str | None) -> str:
|
|
"""Sanitize aircraft callsign for display."""
|
|
if not callsign:
|
|
return ''
|
|
# Only allow alphanumeric, dash, and space
|
|
return re.sub(r'[^A-Za-z0-9\- ]', '', str(callsign))[:10]
|
|
|
|
|
|
def sanitize_ssid(ssid: str | None) -> str:
|
|
"""Sanitize WiFi SSID for display."""
|
|
if not ssid:
|
|
return ''
|
|
# Escape HTML and limit length
|
|
return escape_html(str(ssid)[:64])
|
|
|
|
|
|
def sanitize_device_name(name: str | None) -> str:
|
|
"""Sanitize Bluetooth device name for display."""
|
|
if not name:
|
|
return ''
|
|
# Escape HTML and limit length
|
|
return escape_html(str(name)[:64])
|
|
|
|
|
|
def validate_network_interface(name: Any) -> str:
|
|
"""
|
|
Validate network interface name to prevent command injection.
|
|
|
|
Interface names must:
|
|
- Start with a letter
|
|
- Contain only alphanumeric, underscore, or hyphen
|
|
- Be 1-15 characters long (Linux IFNAMSIZ limit)
|
|
|
|
Args:
|
|
name: Interface name to validate
|
|
|
|
Returns:
|
|
Validated interface name
|
|
|
|
Raises:
|
|
ValueError: If interface name is invalid
|
|
"""
|
|
if not name or not isinstance(name, str):
|
|
raise ValueError("Interface name is required")
|
|
|
|
name = name.strip()
|
|
|
|
if not name:
|
|
raise ValueError("Interface name cannot be empty")
|
|
|
|
if len(name) > 15:
|
|
raise ValueError(f"Interface name too long (max 15 chars): {name}")
|
|
|
|
# Must start with letter, contain only alphanumeric/underscore/hyphen
|
|
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', name):
|
|
raise ValueError(f"Invalid interface name: {name}")
|
|
|
|
return name
|
|
|
|
|
|
def validate_bluetooth_interface(name: Any) -> str:
|
|
"""
|
|
Validate Bluetooth interface name (hciX format).
|
|
|
|
Args:
|
|
name: Interface name to validate
|
|
|
|
Returns:
|
|
Validated interface name
|
|
|
|
Raises:
|
|
ValueError: If interface name is invalid
|
|
"""
|
|
if not name or not isinstance(name, str):
|
|
raise ValueError("Bluetooth interface name is required")
|
|
|
|
name = name.strip()
|
|
|
|
# Must be hciX format where X is a number 0-255
|
|
if not re.match(r'^hci([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$', name):
|
|
raise ValueError(f"Invalid Bluetooth interface name (expected hciX): {name}")
|
|
|
|
return name
|