mirror of
https://github.com/smittix/intercept.git
synced 2026-04-25 07:10:00 -07:00
Features: - Add rtl_tcp support for pager and sensor decoding - Connect to remote RTL-SDR via rtl_tcp server - New UI toggle and host:port inputs in sidebar - Supports rtl_fm and rtl_433 with remote devices - Add remote dump1090 support for ADS-B tracking - Connect to dump1090 SBS output on remote machine - New "Remote" checkbox with host:port in ADS-B dashboard Backend changes: - Add rtl_tcp_host/port fields to SDRDevice dataclass - Add is_network property for detecting remote devices - Update RTLSDRCommandBuilder to use rtl_tcp:host:port format - Add create_network_device() to SDRFactory - Add validate_rtl_tcp_host/port validation functions - Update pager, sensor, and adsb routes to accept remote params Note: dump1090 doesn't support rtl_tcp directly - use remote dump1090's SBS output (port 30003) for remote ADS-B tracking.
198 lines
6.9 KiB
Python
198 lines
6.9 KiB
Python
"""Input validation utilities for API endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
def escape_html(text: str | None) -> str:
|
|
"""Escape HTML special characters to prevent XSS attacks."""
|
|
if text is None:
|
|
return ''
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
html_escape_table = {
|
|
'&': '&',
|
|
'<': '<',
|
|
'>': '>',
|
|
'"': '"',
|
|
"'": ''',
|
|
}
|
|
return ''.join(html_escape_table.get(c, c) for c in text)
|
|
|
|
|
|
def validate_latitude(lat: Any) -> float:
|
|
"""Validate and return latitude value."""
|
|
try:
|
|
lat_float = float(lat)
|
|
if not -90 <= lat_float <= 90:
|
|
raise ValueError(f"Latitude must be between -90 and 90, got {lat_float}")
|
|
return lat_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid latitude: {lat}") from e
|
|
|
|
|
|
def validate_longitude(lon: Any) -> float:
|
|
"""Validate and return longitude value."""
|
|
try:
|
|
lon_float = float(lon)
|
|
if not -180 <= lon_float <= 180:
|
|
raise ValueError(f"Longitude must be between -180 and 180, got {lon_float}")
|
|
return lon_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid longitude: {lon}") from e
|
|
|
|
|
|
def validate_frequency(freq: Any, min_mhz: float = 24.0, max_mhz: float = 1766.0) -> float:
|
|
"""Validate and return frequency in MHz."""
|
|
try:
|
|
freq_float = float(freq)
|
|
if not min_mhz <= freq_float <= max_mhz:
|
|
raise ValueError(f"Frequency must be between {min_mhz} and {max_mhz} MHz, got {freq_float}")
|
|
return freq_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid frequency: {freq}") from e
|
|
|
|
|
|
def validate_device_index(device: Any) -> int:
|
|
"""Validate and return RTL-SDR device index."""
|
|
try:
|
|
device_int = int(device)
|
|
if not 0 <= device_int <= 255:
|
|
raise ValueError(f"Device index must be between 0 and 255, got {device_int}")
|
|
return device_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid device index: {device}") from e
|
|
|
|
|
|
def validate_rtl_tcp_host(host: Any) -> str:
|
|
"""Validate and return rtl_tcp server hostname or IP address."""
|
|
if not host or not isinstance(host, str):
|
|
raise ValueError("rtl_tcp host is required")
|
|
host = host.strip()
|
|
if not host:
|
|
raise ValueError("rtl_tcp host cannot be empty")
|
|
# Allow alphanumeric, dots, hyphens (valid for hostnames and IPs)
|
|
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.\-]*$', host):
|
|
raise ValueError(f"Invalid rtl_tcp host: {host}")
|
|
if len(host) > 253:
|
|
raise ValueError("rtl_tcp host too long")
|
|
return host
|
|
|
|
|
|
def validate_rtl_tcp_port(port: Any) -> int:
|
|
"""Validate and return rtl_tcp server port."""
|
|
try:
|
|
port_int = int(port)
|
|
if not 1 <= port_int <= 65535:
|
|
raise ValueError(f"Port must be between 1 and 65535, got {port_int}")
|
|
return port_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid rtl_tcp port: {port}") from e
|
|
|
|
|
|
def validate_gain(gain: Any) -> float:
|
|
"""Validate and return gain value."""
|
|
try:
|
|
gain_float = float(gain)
|
|
if not 0 <= gain_float <= 50:
|
|
raise ValueError(f"Gain must be between 0 and 50, got {gain_float}")
|
|
return gain_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid gain: {gain}") from e
|
|
|
|
|
|
def validate_ppm(ppm: Any) -> int:
|
|
"""Validate and return PPM correction value."""
|
|
try:
|
|
ppm_int = int(ppm)
|
|
if not -1000 <= ppm_int <= 1000:
|
|
raise ValueError(f"PPM must be between -1000 and 1000, got {ppm_int}")
|
|
return ppm_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid PPM: {ppm}") from e
|
|
|
|
|
|
def validate_hours(hours: Any, min_hours: int = 1, max_hours: int = 168) -> int:
|
|
"""Validate and return hours value (for satellite predictions)."""
|
|
try:
|
|
hours_int = int(hours)
|
|
if not min_hours <= hours_int <= max_hours:
|
|
raise ValueError(f"Hours must be between {min_hours} and {max_hours}, got {hours_int}")
|
|
return hours_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid hours: {hours}") from e
|
|
|
|
|
|
def validate_elevation(elevation: Any) -> float:
|
|
"""Validate and return elevation angle."""
|
|
try:
|
|
el_float = float(elevation)
|
|
if not 0 <= el_float <= 90:
|
|
raise ValueError(f"Elevation must be between 0 and 90, got {el_float}")
|
|
return el_float
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid elevation: {elevation}") from e
|
|
|
|
|
|
def validate_wifi_channel(channel: Any) -> int:
|
|
"""Validate and return WiFi channel."""
|
|
try:
|
|
ch_int = int(channel)
|
|
# Valid WiFi channels: 1-14 (2.4GHz), 32-177 (5GHz)
|
|
valid_2ghz = 1 <= ch_int <= 14
|
|
valid_5ghz = 32 <= ch_int <= 177
|
|
if not (valid_2ghz or valid_5ghz):
|
|
raise ValueError(f"Invalid WiFi channel: {ch_int}")
|
|
return ch_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid WiFi channel: {channel}") from e
|
|
|
|
|
|
def validate_mac_address(mac: Any) -> str:
|
|
"""Validate and return MAC address."""
|
|
if not mac or not isinstance(mac, str):
|
|
raise ValueError("MAC address is required")
|
|
mac = mac.upper().strip()
|
|
if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', mac):
|
|
raise ValueError(f"Invalid MAC address format: {mac}")
|
|
return mac
|
|
|
|
|
|
def validate_positive_int(value: Any, name: str = 'value', max_val: int | None = None) -> int:
|
|
"""Validate and return a positive integer."""
|
|
try:
|
|
val_int = int(value)
|
|
if val_int < 0:
|
|
raise ValueError(f"{name} must be positive, got {val_int}")
|
|
if max_val is not None and val_int > max_val:
|
|
raise ValueError(f"{name} must be <= {max_val}, got {val_int}")
|
|
return val_int
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid {name}: {value}") from e
|
|
|
|
|
|
def sanitize_callsign(callsign: str | None) -> str:
|
|
"""Sanitize aircraft callsign for display."""
|
|
if not callsign:
|
|
return ''
|
|
# Only allow alphanumeric, dash, and space
|
|
return re.sub(r'[^A-Za-z0-9\- ]', '', str(callsign))[:10]
|
|
|
|
|
|
def sanitize_ssid(ssid: str | None) -> str:
|
|
"""Sanitize WiFi SSID for display."""
|
|
if not ssid:
|
|
return ''
|
|
# Escape HTML and limit length
|
|
return escape_html(str(ssid)[:64])
|
|
|
|
|
|
def sanitize_device_name(name: str | None) -> str:
|
|
"""Sanitize Bluetooth device name for display."""
|
|
if not name:
|
|
return ''
|
|
# Escape HTML and limit length
|
|
return escape_html(str(name)[:64])
|