"""Input validation utilities for API endpoints.""" from __future__ import annotations import re from typing import Any def escape_html(text: str | None) -> str: """Escape HTML special characters to prevent XSS attacks.""" if text is None: return '' if not isinstance(text, str): text = str(text) html_escape_table = { '&': '&', '<': '<', '>': '>', '"': '"', "'": ''', } return ''.join(html_escape_table.get(c, c) for c in text) def validate_latitude(lat: Any) -> float: """Validate and return latitude value.""" try: lat_float = float(lat) if not -90 <= lat_float <= 90: raise ValueError(f"Latitude must be between -90 and 90, got {lat_float}") return lat_float except (ValueError, TypeError) as e: raise ValueError(f"Invalid latitude: {lat}") from e def validate_longitude(lon: Any) -> float: """Validate and return longitude value.""" try: lon_float = float(lon) if not -180 <= lon_float <= 180: raise ValueError(f"Longitude must be between -180 and 180, got {lon_float}") return lon_float except (ValueError, TypeError) as e: raise ValueError(f"Invalid longitude: {lon}") from e def validate_frequency(freq: Any, min_mhz: float = 24.0, max_mhz: float = 1766.0) -> float: """Validate and return frequency in MHz.""" try: freq_float = float(freq) if not min_mhz <= freq_float <= max_mhz: raise ValueError(f"Frequency must be between {min_mhz} and {max_mhz} MHz, got {freq_float}") return freq_float except (ValueError, TypeError) as e: raise ValueError(f"Invalid frequency: {freq}") from e def validate_device_index(device: Any) -> int: """Validate and return RTL-SDR device index.""" try: device_int = int(device) if not 0 <= device_int <= 255: raise ValueError(f"Device index must be between 0 and 255, got {device_int}") return device_int except (ValueError, TypeError) as e: raise ValueError(f"Invalid device index: {device}") from e def validate_rtl_tcp_host(host: Any) -> str: """Validate and return rtl_tcp server hostname or IP address.""" if not host or not isinstance(host, str): raise ValueError("rtl_tcp host is required") host = host.strip() if not host: raise ValueError("rtl_tcp host cannot be empty") # Allow alphanumeric, dots, hyphens (valid for hostnames and IPs) if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.\-]*$', host): raise ValueError(f"Invalid rtl_tcp host: {host}") if len(host) > 253: raise ValueError("rtl_tcp host too long") return host def validate_rtl_tcp_port(port: Any) -> int: """Validate and return rtl_tcp server port.""" try: port_int = int(port) if not 1 <= port_int <= 65535: raise ValueError(f"Port must be between 1 and 65535, got {port_int}") return port_int except (ValueError, TypeError) as e: raise ValueError(f"Invalid rtl_tcp port: {port}") from e def validate_gain(gain: Any) -> float: """Validate and return gain value.""" try: gain_float = float(gain) if not 0 <= gain_float <= 50: raise ValueError(f"Gain must be between 0 and 50, got {gain_float}") return gain_float except (ValueError, TypeError) as e: raise ValueError(f"Invalid gain: {gain}") from e def validate_ppm(ppm: Any) -> int: """Validate and return PPM correction value.""" try: ppm_int = int(ppm) if not -1000 <= ppm_int <= 1000: raise ValueError(f"PPM must be between -1000 and 1000, got {ppm_int}") return ppm_int except (ValueError, TypeError) as e: raise ValueError(f"Invalid PPM: {ppm}") from e def validate_hours(hours: Any, min_hours: int = 1, max_hours: int = 168) -> int: """Validate and return hours value (for satellite predictions).""" try: hours_int = int(hours) if not min_hours <= hours_int <= max_hours: raise ValueError(f"Hours must be between {min_hours} and {max_hours}, got {hours_int}") return hours_int except (ValueError, TypeError) as e: raise ValueError(f"Invalid hours: {hours}") from e def validate_elevation(elevation: Any) -> float: """Validate and return elevation angle.""" try: el_float = float(elevation) if not 0 <= el_float <= 90: raise ValueError(f"Elevation must be between 0 and 90, got {el_float}") return el_float except (ValueError, TypeError) as e: raise ValueError(f"Invalid elevation: {elevation}") from e def validate_wifi_channel(channel: Any) -> int: """Validate and return WiFi channel.""" try: ch_int = int(channel) # Valid WiFi channels: 1-14 (2.4GHz), 32-177 (5GHz) valid_2ghz = 1 <= ch_int <= 14 valid_5ghz = 32 <= ch_int <= 177 if not (valid_2ghz or valid_5ghz): raise ValueError(f"Invalid WiFi channel: {ch_int}") return ch_int except (ValueError, TypeError) as e: raise ValueError(f"Invalid WiFi channel: {channel}") from e def validate_mac_address(mac: Any) -> str: """Validate and return MAC address.""" if not mac or not isinstance(mac, str): raise ValueError("MAC address is required") mac = mac.upper().strip() if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', mac): raise ValueError(f"Invalid MAC address format: {mac}") return mac def validate_positive_int(value: Any, name: str = 'value', max_val: int | None = None) -> int: """Validate and return a positive integer.""" try: val_int = int(value) if val_int < 0: raise ValueError(f"{name} must be positive, got {val_int}") if max_val is not None and val_int > max_val: raise ValueError(f"{name} must be <= {max_val}, got {val_int}") return val_int except (ValueError, TypeError) as e: raise ValueError(f"Invalid {name}: {value}") from e def sanitize_callsign(callsign: str | None) -> str: """Sanitize aircraft callsign for display.""" if not callsign: return '' # Only allow alphanumeric, dash, and space return re.sub(r'[^A-Za-z0-9\- ]', '', str(callsign))[:10] def sanitize_ssid(ssid: str | None) -> str: """Sanitize WiFi SSID for display.""" if not ssid: return '' # Escape HTML and limit length return escape_html(str(ssid)[:64]) def sanitize_device_name(name: str | None) -> str: """Sanitize Bluetooth device name for display.""" if not name: return '' # Escape HTML and limit length return escape_html(str(name)[:64]) def validate_network_interface(name: Any) -> str: """ Validate network interface name to prevent command injection. Interface names must: - Start with a letter - Contain only alphanumeric, underscore, or hyphen - Be 1-15 characters long (Linux IFNAMSIZ limit) Args: name: Interface name to validate Returns: Validated interface name Raises: ValueError: If interface name is invalid """ if not name or not isinstance(name, str): raise ValueError("Interface name is required") name = name.strip() if not name: raise ValueError("Interface name cannot be empty") if len(name) > 15: raise ValueError(f"Interface name too long (max 15 chars): {name}") # Must start with letter, contain only alphanumeric/underscore/hyphen if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', name): raise ValueError(f"Invalid interface name: {name}") return name def validate_bluetooth_interface(name: Any) -> str: """ Validate Bluetooth interface name (hciX format). Args: name: Interface name to validate Returns: Validated interface name Raises: ValueError: If interface name is invalid """ if not name or not isinstance(name, str): raise ValueError("Bluetooth interface name is required") name = name.strip() # Must be hciX format where X is a number 0-255 if not re.match(r'^hci([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$', name): raise ValueError(f"Invalid Bluetooth interface name (expected hciX): {name}") return name