diff --git a/app.py b/app.py index 0c7e936..0f626c5 100644 --- a/app.py +++ b/app.py @@ -23,7 +23,7 @@ import subprocess from typing import Any -from flask import Flask, render_template, jsonify, send_file, Response +from flask import Flask, render_template, jsonify, send_file, Response, request from utils.dependencies import check_tool, check_all_dependencies, TOOL_DEPENDENCIES from utils.process import detect_devices, cleanup_stale_processes @@ -140,6 +140,110 @@ def get_dependencies() -> Response: }) +@app.route('/export/aircraft', methods=['GET']) +def export_aircraft() -> Response: + """Export aircraft data as JSON or CSV.""" + import csv + import io + + format_type = request.args.get('format', 'json').lower() + + if format_type == 'csv': + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(['icao', 'callsign', 'altitude', 'speed', 'heading', 'lat', 'lon', 'squawk', 'last_seen']) + + for icao, ac in adsb_aircraft.items(): + writer.writerow([ + icao, + ac.get('callsign', ''), + ac.get('altitude', ''), + ac.get('speed', ''), + ac.get('heading', ''), + ac.get('lat', ''), + ac.get('lon', ''), + ac.get('squawk', ''), + ac.get('lastSeen', '') + ]) + + response = Response(output.getvalue(), mimetype='text/csv') + response.headers['Content-Disposition'] = 'attachment; filename=aircraft.csv' + return response + else: + return jsonify({ + 'timestamp': __import__('datetime').datetime.utcnow().isoformat(), + 'aircraft': list(adsb_aircraft.values()) + }) + + +@app.route('/export/wifi', methods=['GET']) +def export_wifi() -> Response: + """Export WiFi networks as JSON or CSV.""" + import csv + import io + + format_type = request.args.get('format', 'json').lower() + + if format_type == 'csv': + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(['bssid', 'ssid', 'channel', 'signal', 'encryption', 'clients']) + + for bssid, net in wifi_networks.items(): + writer.writerow([ + bssid, + net.get('ssid', ''), + net.get('channel', ''), + net.get('signal', ''), + net.get('encryption', ''), + net.get('clients', 0) + ]) + + response = Response(output.getvalue(), mimetype='text/csv') + response.headers['Content-Disposition'] = 'attachment; filename=wifi_networks.csv' + return response + else: + return jsonify({ + 'timestamp': __import__('datetime').datetime.utcnow().isoformat(), + 'networks': list(wifi_networks.values()), + 'clients': list(wifi_clients.values()) + }) + + +@app.route('/export/bluetooth', methods=['GET']) +def export_bluetooth() -> Response: + """Export Bluetooth devices as JSON or CSV.""" + import csv + import io + + format_type = request.args.get('format', 'json').lower() + + if format_type == 'csv': + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(['mac', 'name', 'rssi', 'type', 'manufacturer', 'last_seen']) + + for mac, dev in bt_devices.items(): + writer.writerow([ + mac, + dev.get('name', ''), + dev.get('rssi', ''), + dev.get('type', ''), + dev.get('manufacturer', ''), + dev.get('lastSeen', '') + ]) + + response = Response(output.getvalue(), mimetype='text/csv') + response.headers['Content-Disposition'] = 'attachment; filename=bluetooth_devices.csv' + return response + else: + return jsonify({ + 'timestamp': __import__('datetime').datetime.utcnow().isoformat(), + 'devices': list(bt_devices.values()), + 'beacons': list(bt_beacons.values()) + }) + + @app.route('/killall', methods=['POST']) def kill_all() -> Response: """Kill all decoder and WiFi processes.""" diff --git a/routes/adsb.py b/routes/adsb.py index 223e0cd..a43b331 100644 --- a/routes/adsb.py +++ b/routes/adsb.py @@ -16,6 +16,8 @@ from flask import Blueprint, jsonify, request, Response, render_template import app as app_module from utils.logging import adsb_logger as logger +from utils.validation import validate_device_index, validate_gain +from utils.sse import format_sse adsb_bp = Blueprint('adsb', __name__, url_prefix='/adsb') @@ -218,11 +220,16 @@ def start_adsb(): with app_module.adsb_lock: if adsb_using_service: - return jsonify({'status': 'already_running', 'message': 'ADS-B tracking already active'}) + return jsonify({'status': 'already_running', 'message': 'ADS-B tracking already active'}), 409 data = request.json or {} - gain = data.get('gain', '40') - device = data.get('device', '0') + + # Validate inputs + try: + gain = int(validate_gain(data.get('gain', '40'))) + device = validate_device_index(data.get('device', '0')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 # Check if dump1090 is already running externally (e.g., user started it manually) existing_service = check_dump1090_service() @@ -294,12 +301,19 @@ def stop_adsb(): def stream_adsb(): """SSE stream for ADS-B aircraft.""" def generate(): + last_keepalive = time.time() + keepalive_interval = 30.0 + while True: try: msg = app_module.adsb_queue.get(timeout=1) - yield f"data: {json.dumps(msg)}\n\n" + last_keepalive = time.time() + yield format_sse(msg) except queue.Empty: - yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/routes/bluetooth.py b/routes/bluetooth.py index aba437e..aa33b2d 100644 --- a/routes/bluetooth.py +++ b/routes/bluetooth.py @@ -19,6 +19,8 @@ from flask import Blueprint, jsonify, request, Response import app as app_module from utils.dependencies import check_tool +from utils.logging import bluetooth_logger as logger +from utils.sse import format_sse from data.oui import OUI_DATABASE, load_oui_database, get_manufacturer from data.patterns import AIRTAG_PREFIXES, TILE_PREFIXES, SAMSUNG_TRACKER @@ -469,12 +471,19 @@ def get_bt_devices(): def stream_bt(): """SSE stream for Bluetooth events.""" def generate(): + last_keepalive = time.time() + keepalive_interval = 30.0 + while True: try: msg = app_module.bt_queue.get(timeout=1) - yield f"data: {json.dumps(msg)}\n\n" + last_keepalive = time.time() + yield format_sse(msg) except queue.Empty: - yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/routes/iridium.py b/routes/iridium.py index 4cfdde6..f751260 100644 --- a/routes/iridium.py +++ b/routes/iridium.py @@ -1,4 +1,9 @@ -"""Iridium monitoring routes.""" +"""Iridium monitoring routes. + +NOTE: This module is currently in DEMO MODE. The burst detection generates +simulated data for demonstration purposes. Real Iridium decoding requires +gr-iridium or iridium-toolkit which are not yet integrated. +""" from __future__ import annotations @@ -16,24 +21,42 @@ from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import iridium_logger as logger +from utils.validation import validate_frequency, validate_device_index, validate_gain +from utils.sse import format_sse iridium_bp = Blueprint('iridium', __name__, url_prefix='/iridium') +# Flag indicating this is demo mode (simulated data) +DEMO_MODE = True + def monitor_iridium(process): - """Monitor Iridium capture and detect bursts.""" + """ + Monitor Iridium capture and detect bursts. + + NOTE: Currently generates SIMULATED data for demonstration. + Real Iridium decoding is not yet implemented. + """ try: burst_count = 0 + # Send initial demo mode warning + app_module.satellite_queue.put({ + 'type': 'info', + 'message': '⚠️ DEMO MODE: Generating simulated Iridium bursts for demonstration' + }) + while process.poll() is None: data = process.stdout.read(1024) if data: if len(data) > 0 and burst_count < 100: + # DEMO: Generate simulated bursts (1% chance per read) if random.random() < 0.01: burst = { 'type': 'burst', + 'demo': True, # Flag as demo data 'time': datetime.now().strftime('%H:%M:%S.%f')[:-3], 'frequency': f"{1616 + random.random() * 10:.3f}", - 'data': f"Frame data (simulated) - Burst #{burst_count + 1}" + 'data': f"[SIMULATED] Frame data - Burst #{burst_count + 1}" } app_module.satellite_queue.put(burst) app_module.iridium_bursts.append(burst) @@ -47,28 +70,44 @@ def monitor_iridium(process): @iridium_bp.route('/tools') def check_iridium_tools(): """Check for Iridium decoding tools.""" - has_tool = shutil.which('iridium-extractor') is not None or shutil.which('iridium-parser') is not None - return jsonify({'available': has_tool}) + has_iridium = shutil.which('iridium-extractor') is not None or shutil.which('iridium-parser') is not None + has_rtl = shutil.which('rtl_fm') is not None + return jsonify({ + 'available': has_iridium or has_rtl, + 'demo_mode': DEMO_MODE, + 'message': 'Demo mode active - generating simulated data' if DEMO_MODE else None + }) @iridium_bp.route('/start', methods=['POST']) def start_iridium(): - """Start Iridium burst capture.""" + """Start Iridium burst capture (DEMO MODE - simulated data).""" with app_module.satellite_lock: if app_module.satellite_process and app_module.satellite_process.poll() is None: - return jsonify({'status': 'error', 'message': 'Iridium capture already running'}) + return jsonify({'status': 'error', 'message': 'Iridium capture already running'}), 409 + + data = request.json or {} + + # Validate inputs + try: + freq = validate_frequency(data.get('freq', '1626.0'), min_mhz=1610.0, max_mhz=1650.0) + gain = validate_gain(data.get('gain', '40')) + device = validate_device_index(data.get('device', '0')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 - data = request.json - freq = data.get('freq', '1626.0') - gain = data.get('gain', '40') sample_rate = data.get('sampleRate', '2.048e6') - device = data.get('device', '0') + # Validate sample rate format + try: + float(sample_rate.replace('e', 'E')) + except (ValueError, AttributeError): + return jsonify({'status': 'error', 'message': 'Invalid sample rate format'}), 400 if not shutil.which('iridium-extractor') and not shutil.which('rtl_fm'): return jsonify({ 'status': 'error', - 'message': 'Iridium tools not found.' - }) + 'message': 'Iridium tools not found. Requires rtl_fm or iridium-extractor.' + }), 503 try: cmd = [ @@ -89,9 +128,17 @@ def start_iridium(): thread = threading.Thread(target=monitor_iridium, args=(app_module.satellite_process,), daemon=True) thread.start() - return jsonify({'status': 'started'}) + return jsonify({ + 'status': 'started', + 'demo_mode': DEMO_MODE, + 'message': 'Demo mode active - data is simulated' if DEMO_MODE else None + }) + except FileNotFoundError as e: + logger.error(f"Tool not found: {e}") + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}), 503 except Exception as e: - return jsonify({'status': 'error', 'message': str(e)}) + logger.error(f"Start error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 @iridium_bp.route('/stop', methods=['POST']) @@ -113,12 +160,19 @@ def stop_iridium(): def stream_iridium(): """SSE stream for Iridium bursts.""" def generate(): + last_keepalive = time.time() + keepalive_interval = 30.0 + while True: try: msg = app_module.satellite_queue.get(timeout=1) - yield f"data: {json.dumps(msg)}\n\n" + last_keepalive = time.time() + yield format_sse(msg) except queue.Empty: - yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/routes/pager.py b/routes/pager.py index dfa1636..d26dc60 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -3,12 +3,14 @@ from __future__ import annotations import os +import pathlib import re import pty import queue import select import subprocess import threading +import time from datetime import datetime from typing import Any, Generator @@ -16,6 +18,9 @@ from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import pager_logger as logger +from utils.validation import validate_frequency, validate_device_index, validate_gain, validate_ppm +from utils.sse import format_sse +from utils.process import safe_terminate, register_process pager_bp = Blueprint('pager', __name__) @@ -147,15 +152,35 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: def start_decoding() -> Response: with app_module.process_lock: if app_module.current_process: - return jsonify({'status': 'error', 'message': 'Already running'}) + return jsonify({'status': 'error', 'message': 'Already running'}), 409 + + data = request.json or {} + + # Validate inputs + try: + freq = validate_frequency(data.get('frequency', '929.6125')) + gain = validate_gain(data.get('gain', '0')) + ppm = validate_ppm(data.get('ppm', '0')) + device = validate_device_index(data.get('device', '0')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 - data = request.json - freq = data.get('frequency', '929.6125') - gain = data.get('gain', '0') squelch = data.get('squelch', '0') - ppm = data.get('ppm', '0') - device = data.get('device', '0') - protocols = data.get('protocols', ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX']) + try: + squelch = int(squelch) + if not 0 <= squelch <= 1000: + raise ValueError("Squelch must be between 0 and 1000") + except (ValueError, TypeError): + return jsonify({'status': 'error', 'message': 'Invalid squelch value'}), 400 + + # Validate protocols + valid_protocols = ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX'] + protocols = data.get('protocols', valid_protocols) + if not isinstance(protocols, list): + return jsonify({'status': 'error', 'message': 'Protocols must be a list'}), 400 + protocols = [p for p in protocols if p in valid_protocols] + if not protocols: + protocols = valid_protocols # Clear queue while not app_module.output_queue.empty(): @@ -301,11 +326,34 @@ def get_status() -> Response: @pager_bp.route('/logging', methods=['POST']) def toggle_logging() -> Response: """Toggle message logging.""" - data = request.json + data = request.json or {} if 'enabled' in data: - app_module.logging_enabled = data['enabled'] + app_module.logging_enabled = bool(data['enabled']) + if 'log_file' in data and data['log_file']: - app_module.log_file_path = data['log_file'] + # Validate path to prevent directory traversal + try: + requested_path = pathlib.Path(data['log_file']).resolve() + # Only allow files in the current directory or logs subdirectory + cwd = pathlib.Path('.').resolve() + logs_dir = (cwd / 'logs').resolve() + + # Check if path is within allowed directories + is_in_cwd = str(requested_path).startswith(str(cwd)) + is_in_logs = str(requested_path).startswith(str(logs_dir)) + + if not (is_in_cwd or is_in_logs): + return jsonify({'status': 'error', 'message': 'Invalid log file path'}), 400 + + # Ensure it's not a directory + if requested_path.is_dir(): + return jsonify({'status': 'error', 'message': 'Log file path must be a file, not a directory'}), 400 + + app_module.log_file_path = str(requested_path) + except (ValueError, OSError) as e: + logger.warning(f"Invalid log file path: {e}") + return jsonify({'status': 'error', 'message': 'Invalid log file path'}), 400 + return jsonify({'logging': app_module.logging_enabled, 'log_file': app_module.log_file_path}) @@ -314,12 +362,19 @@ def stream() -> Response: import json def generate() -> Generator[str, None, None]: + last_keepalive = time.time() + keepalive_interval = 30.0 # Send keepalive every 30 seconds instead of 1 second + while True: try: msg = app_module.output_queue.get(timeout=1) - yield f"data: {json.dumps(msg)}\n\n" + last_keepalive = time.time() + yield format_sse(msg) except queue.Empty: - yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/routes/satellite.py b/routes/satellite.py index ebc1e23..921be6a 100644 --- a/routes/satellite.py +++ b/routes/satellite.py @@ -6,14 +6,22 @@ import json import urllib.request from datetime import datetime, timedelta from typing import Any +from urllib.parse import urlparse from flask import Blueprint, jsonify, request, render_template, Response from data.satellites import TLE_SATELLITES from utils.logging import satellite_logger as logger +from utils.validation import validate_latitude, validate_longitude, validate_hours, validate_elevation satellite_bp = Blueprint('satellite', __name__, url_prefix='/satellite') +# Maximum response size for external requests (1MB) +MAX_RESPONSE_SIZE = 1024 * 1024 + +# Allowed hosts for TLE fetching +ALLOWED_TLE_HOSTS = ['celestrak.org', 'celestrak.com', 'www.celestrak.org', 'www.celestrak.com'] + # Local TLE cache (can be updated via API) _tle_cache = dict(TLE_SATELLITES) @@ -34,13 +42,18 @@ def predict_passes(): return jsonify({ 'status': 'error', 'message': 'skyfield library not installed. Run: pip install skyfield' - }) + }), 503 - data = request.json - lat = data.get('latitude', data.get('lat', 51.5074)) - lon = data.get('longitude', data.get('lon', -0.1278)) - hours = data.get('hours', 24) - min_el = data.get('minEl', 10) + data = request.json or {} + + # Validate inputs + try: + lat = validate_latitude(data.get('latitude', data.get('lat', 51.5074))) + lon = validate_longitude(data.get('longitude', data.get('lon', -0.1278))) + hours = validate_hours(data.get('hours', 24)) + min_el = validate_elevation(data.get('minEl', 10)) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 norad_to_name = { 25544: 'ISS', @@ -187,13 +200,19 @@ def get_satellite_position(): try: from skyfield.api import load, wgs84, EarthSatellite except ImportError: - return jsonify({'status': 'error', 'message': 'skyfield not installed'}) + return jsonify({'status': 'error', 'message': 'skyfield not installed'}), 503 + + data = request.json or {} + + # Validate inputs + try: + lat = validate_latitude(data.get('latitude', data.get('lat', 51.5074))) + lon = validate_longitude(data.get('longitude', data.get('lon', -0.1278))) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 - data = request.json - lat = data.get('latitude', data.get('lat', 51.5074)) - lon = data.get('longitude', data.get('lon', -0.1278)) sat_input = data.get('satellites', []) - include_track = data.get('includeTrack', True) + include_track = bool(data.get('includeTrack', True)) norad_to_name = { 25544: 'ISS', diff --git a/routes/sensor.py b/routes/sensor.py index bd722dc..a3e99fe 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -6,6 +6,7 @@ import json import queue import subprocess import threading +import time from datetime import datetime from typing import Generator @@ -13,6 +14,9 @@ from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import sensor_logger as logger +from utils.validation import validate_frequency, validate_device_index, validate_gain, validate_ppm +from utils.sse import format_sse +from utils.process import safe_terminate, register_process sensor_bp = Blueprint('sensor', __name__) @@ -58,13 +62,18 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: def start_sensor() -> Response: with app_module.sensor_lock: if app_module.sensor_process: - return jsonify({'status': 'error', 'message': 'Sensor already running'}) + return jsonify({'status': 'error', 'message': 'Sensor already running'}), 409 - data = request.json - freq = data.get('frequency', '433.92') - gain = data.get('gain', '0') - ppm = data.get('ppm', '0') - device = data.get('device', '0') + data = request.json or {} + + # Validate inputs + try: + freq = validate_frequency(data.get('frequency', '433.92')) + gain = validate_gain(data.get('gain', '0')) + ppm = validate_ppm(data.get('ppm', '0')) + device = validate_device_index(data.get('device', '0')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 # Clear queue while not app_module.sensor_queue.empty(): @@ -81,10 +90,10 @@ def start_sensor() -> Response: '-F', 'json' ] - if gain and gain != '0': - cmd.extend(['-g', str(gain)]) + if gain and gain != 0: + cmd.extend(['-g', str(int(gain))]) - if ppm and ppm != '0': + if ppm and ppm != 0: cmd.extend(['-p', str(ppm)]) full_cmd = ' '.join(cmd) @@ -143,12 +152,19 @@ def stop_sensor() -> Response: @sensor_bp.route('/stream_sensor') def stream_sensor() -> Response: def generate() -> Generator[str, None, None]: + last_keepalive = time.time() + keepalive_interval = 30.0 + while True: try: msg = app_module.sensor_queue.get(timeout=1) - yield f"data: {json.dumps(msg)}\n\n" + last_keepalive = time.time() + yield format_sse(msg) except queue.Empty: - yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/routes/wifi.py b/routes/wifi.py index f47e155..5c932b6 100644 --- a/routes/wifi.py +++ b/routes/wifi.py @@ -19,6 +19,8 @@ import app as app_module from utils.dependencies import check_tool from utils.logging import wifi_logger as logger from utils.process import is_valid_mac, is_valid_channel +from utils.validation import validate_wifi_channel, validate_mac_address +from utils.sse import format_sse from data.oui import get_manufacturer wifi_bp = Blueprint('wifi', __name__, url_prefix='/wifi') @@ -758,12 +760,19 @@ def get_wifi_networks(): def stream_wifi(): """SSE stream for WiFi events.""" def generate(): + last_keepalive = time.time() + keepalive_interval = 30.0 + while True: try: msg = app_module.wifi_queue.get(timeout=1) - yield f"data: {json.dumps(msg)}\n\n" + last_keepalive = time.time() + yield format_sse(msg) except queue.Empty: - yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/utils/__init__.py b/utils/__init__.py index af4b50e..73e0717 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -1,6 +1,15 @@ # Utility modules for INTERCEPT from .dependencies import check_tool, check_all_dependencies, TOOL_DEPENDENCIES -from .process import cleanup_stale_processes, is_valid_mac, is_valid_channel, detect_devices +from .process import ( + cleanup_stale_processes, + is_valid_mac, + is_valid_channel, + detect_devices, + safe_terminate, + register_process, + unregister_process, + cleanup_all_processes, +) from .logging import ( get_logger, app_logger, @@ -12,3 +21,22 @@ from .logging import ( satellite_logger, iridium_logger, ) +from .validation import ( + escape_html, + validate_latitude, + validate_longitude, + validate_frequency, + validate_device_index, + validate_gain, + validate_ppm, + validate_hours, + validate_elevation, + validate_wifi_channel, + validate_mac_address, + validate_positive_int, + sanitize_callsign, + sanitize_ssid, + sanitize_device_name, +) +from .sse import sse_stream, format_sse, clear_queue +from .cleanup import DataStore, CleanupManager, cleanup_manager, cleanup_dict diff --git a/utils/cleanup.py b/utils/cleanup.py new file mode 100644 index 0000000..b5e3b5a --- /dev/null +++ b/utils/cleanup.py @@ -0,0 +1,241 @@ +"""Data cleanup utilities for stale entries.""" + +from __future__ import annotations + +import logging +import threading +import time +from typing import Any + +logger = logging.getLogger('intercept.cleanup') + + +class DataStore: + """Thread-safe data store with automatic cleanup of stale entries.""" + + def __init__(self, max_age_seconds: float = 300.0, name: str = 'data'): + """ + Initialize data store. + + Args: + max_age_seconds: Maximum age of entries before cleanup (default 5 minutes) + name: Name for logging purposes + """ + self.data: dict[str, Any] = {} + self.timestamps: dict[str, float] = {} + self.max_age = max_age_seconds + self.name = name + self._lock = threading.Lock() + + def set(self, key: str, value: Any) -> None: + """Add or update an entry.""" + with self._lock: + self.data[key] = value + self.timestamps[key] = time.time() + + def get(self, key: str, default: Any = None) -> Any: + """Get an entry.""" + with self._lock: + return self.data.get(key, default) + + def update(self, key: str, updates: dict) -> None: + """Update an existing entry with new values.""" + with self._lock: + if key in self.data: + if isinstance(self.data[key], dict): + self.data[key].update(updates) + else: + self.data[key] = updates + else: + self.data[key] = updates + self.timestamps[key] = time.time() + + def touch(self, key: str) -> None: + """Update timestamp for an entry without changing data.""" + with self._lock: + if key in self.data: + self.timestamps[key] = time.time() + + def delete(self, key: str) -> bool: + """Delete an entry.""" + with self._lock: + if key in self.data: + del self.data[key] + del self.timestamps[key] + return True + return False + + def clear(self) -> None: + """Clear all entries.""" + with self._lock: + self.data.clear() + self.timestamps.clear() + + def all(self) -> dict[str, Any]: + """Get a copy of all data.""" + with self._lock: + return dict(self.data) + + def keys(self) -> list[str]: + """Get all keys.""" + with self._lock: + return list(self.data.keys()) + + def values(self) -> list[Any]: + """Get all values.""" + with self._lock: + return list(self.data.values()) + + def items(self) -> list[tuple[str, Any]]: + """Get all items.""" + with self._lock: + return list(self.data.items()) + + def __len__(self) -> int: + with self._lock: + return len(self.data) + + def __contains__(self, key: str) -> bool: + with self._lock: + return key in self.data + + def cleanup(self) -> int: + """ + Remove entries older than max_age. + + Returns: + Number of entries removed + """ + now = time.time() + expired = [] + + with self._lock: + for key, timestamp in self.timestamps.items(): + if now - timestamp > self.max_age: + expired.append(key) + + for key in expired: + del self.data[key] + del self.timestamps[key] + + if expired: + logger.debug(f"{self.name}: Cleaned up {len(expired)} stale entries") + + return len(expired) + + +class CleanupManager: + """Manages periodic cleanup of multiple data stores.""" + + def __init__(self, interval: float = 60.0): + """ + Initialize cleanup manager. + + Args: + interval: Cleanup interval in seconds + """ + self.stores: list[DataStore] = [] + self.interval = interval + self._timer: threading.Timer | None = None + self._running = False + self._lock = threading.Lock() + + def register(self, store: DataStore) -> None: + """Register a data store for cleanup.""" + with self._lock: + if store not in self.stores: + self.stores.append(store) + + def unregister(self, store: DataStore) -> None: + """Unregister a data store.""" + with self._lock: + if store in self.stores: + self.stores.remove(store) + + def start(self) -> None: + """Start the cleanup timer.""" + with self._lock: + if self._running: + return + self._running = True + self._schedule_cleanup() + + def stop(self) -> None: + """Stop the cleanup timer.""" + with self._lock: + self._running = False + if self._timer: + self._timer.cancel() + self._timer = None + + def _schedule_cleanup(self) -> None: + """Schedule the next cleanup.""" + if not self._running: + return + self._timer = threading.Timer(self.interval, self._run_cleanup) + self._timer.daemon = True + self._timer.start() + + def _run_cleanup(self) -> None: + """Run cleanup on all registered stores.""" + total_cleaned = 0 + + with self._lock: + stores = list(self.stores) + + for store in stores: + try: + total_cleaned += store.cleanup() + except Exception as e: + logger.error(f"Error cleaning up {store.name}: {e}") + + if total_cleaned > 0: + logger.info(f"Cleanup complete: removed {total_cleaned} stale entries") + + self._schedule_cleanup() + + def cleanup_now(self) -> int: + """Run cleanup immediately.""" + total = 0 + with self._lock: + stores = list(self.stores) + for store in stores: + try: + total += store.cleanup() + except Exception as e: + logger.error(f"Error cleaning up {store.name}: {e}") + return total + + +# Global cleanup manager +cleanup_manager = CleanupManager(interval=60.0) + + +def cleanup_dict( + data: dict[str, Any], + timestamps: dict[str, float], + max_age_seconds: float = 300.0 +) -> list[str]: + """ + Clean up stale entries from a dictionary. + + Args: + data: Dictionary to clean + timestamps: Dictionary of key -> last_seen timestamp + max_age_seconds: Maximum age in seconds + + Returns: + List of removed keys + """ + now = time.time() + expired = [] + + for key, timestamp in list(timestamps.items()): + if now - timestamp > max_age_seconds: + expired.append(key) + + for key in expired: + data.pop(key, None) + timestamps.pop(key, None) + + return expired diff --git a/utils/process.py b/utils/process.py index 4c4c8de..dd4741c 100644 --- a/utils/process.py +++ b/utils/process.py @@ -1,11 +1,103 @@ from __future__ import annotations +import atexit +import logging +import signal import subprocess import re -from typing import Any +import threading +import time +from typing import Any, Callable from .dependencies import check_tool +logger = logging.getLogger('intercept.process') + +# Track all spawned processes for cleanup +_spawned_processes: list[subprocess.Popen] = [] +_process_lock = threading.Lock() + + +def register_process(process: subprocess.Popen) -> None: + """Register a spawned process for cleanup on exit.""" + with _process_lock: + _spawned_processes.append(process) + + +def unregister_process(process: subprocess.Popen) -> None: + """Unregister a process from cleanup list.""" + with _process_lock: + if process in _spawned_processes: + _spawned_processes.remove(process) + + +def cleanup_all_processes() -> None: + """Clean up all registered processes on exit.""" + logger.info("Cleaning up all spawned processes...") + with _process_lock: + for process in _spawned_processes: + if process and process.poll() is None: + try: + process.terminate() + process.wait(timeout=2) + except subprocess.TimeoutExpired: + process.kill() + except Exception as e: + logger.warning(f"Error cleaning up process: {e}") + _spawned_processes.clear() + + +def safe_terminate(process: subprocess.Popen | None, timeout: float = 2.0) -> bool: + """ + Safely terminate a process. + + Args: + process: Process to terminate + timeout: Seconds to wait before killing + + Returns: + True if process was terminated, False if already dead or None + """ + if not process: + return False + + if process.poll() is not None: + # Already dead + unregister_process(process) + return False + + try: + process.terminate() + process.wait(timeout=timeout) + unregister_process(process) + return True + except subprocess.TimeoutExpired: + process.kill() + unregister_process(process) + return True + except Exception as e: + logger.warning(f"Error terminating process: {e}") + return False + + +# Register cleanup handlers +atexit.register(cleanup_all_processes) + +# Handle signals for graceful shutdown +def _signal_handler(signum, frame): + """Handle termination signals.""" + logger.info(f"Received signal {signum}, cleaning up...") + cleanup_all_processes() + + +# Only register signal handlers if we're not in a thread +try: + signal.signal(signal.SIGTERM, _signal_handler) + signal.signal(signal.SIGINT, _signal_handler) +except ValueError: + # Can't set signal handlers from a thread + pass + def cleanup_stale_processes() -> None: """Kill any stale processes from previous runs (but not system services).""" diff --git a/utils/sse.py b/utils/sse.py new file mode 100644 index 0000000..c796c32 --- /dev/null +++ b/utils/sse.py @@ -0,0 +1,89 @@ +"""Server-Sent Events (SSE) utilities.""" + +from __future__ import annotations + +import json +import queue +import time +from typing import Any, Generator + + +def sse_stream( + data_queue: queue.Queue, + timeout: float = 1.0, + keepalive_interval: float = 30.0, + stop_check: callable = None +) -> Generator[str, None, None]: + """ + Generate SSE stream from a queue. + + Args: + data_queue: Queue to read messages from + timeout: Queue get timeout in seconds + keepalive_interval: Seconds between keepalive messages + stop_check: Optional callable that returns True to stop the stream + + Yields: + SSE formatted strings + """ + last_keepalive = time.time() + + while True: + # Check if we should stop + if stop_check and stop_check(): + break + + try: + msg = data_queue.get(timeout=timeout) + last_keepalive = time.time() + yield format_sse(msg) + except queue.Empty: + # Send keepalive if enough time has passed + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now + + +def format_sse(data: dict[str, Any] | str, event: str | None = None) -> str: + """ + Format data as SSE message. + + Args: + data: Data to send (will be JSON encoded if dict) + event: Optional event name + + Returns: + SSE formatted string + """ + if isinstance(data, dict): + data = json.dumps(data) + + lines = [] + if event: + lines.append(f"event: {event}") + lines.append(f"data: {data}") + lines.append("") + lines.append("") + + return '\n'.join(lines) + + +def clear_queue(q: queue.Queue) -> int: + """ + Clear all items from a queue. + + Args: + q: Queue to clear + + Returns: + Number of items cleared + """ + count = 0 + while True: + try: + q.get_nowait() + count += 1 + except queue.Empty: + break + return count diff --git a/utils/validation.py b/utils/validation.py new file mode 100644 index 0000000..a6bee0e --- /dev/null +++ b/utils/validation.py @@ -0,0 +1,171 @@ +"""Input validation utilities for API endpoints.""" + +from __future__ import annotations + +import re +from typing import Any + + +def escape_html(text: str | None) -> str: + """Escape HTML special characters to prevent XSS attacks.""" + if text is None: + return '' + if not isinstance(text, str): + text = str(text) + html_escape_table = { + '&': '&', + '<': '<', + '>': '>', + '"': '"', + "'": ''', + } + return ''.join(html_escape_table.get(c, c) for c in text) + + +def validate_latitude(lat: Any) -> float: + """Validate and return latitude value.""" + try: + lat_float = float(lat) + if not -90 <= lat_float <= 90: + raise ValueError(f"Latitude must be between -90 and 90, got {lat_float}") + return lat_float + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid latitude: {lat}") from e + + +def validate_longitude(lon: Any) -> float: + """Validate and return longitude value.""" + try: + lon_float = float(lon) + if not -180 <= lon_float <= 180: + raise ValueError(f"Longitude must be between -180 and 180, got {lon_float}") + return lon_float + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid longitude: {lon}") from e + + +def validate_frequency(freq: Any, min_mhz: float = 24.0, max_mhz: float = 1766.0) -> float: + """Validate and return frequency in MHz.""" + try: + freq_float = float(freq) + if not min_mhz <= freq_float <= max_mhz: + raise ValueError(f"Frequency must be between {min_mhz} and {max_mhz} MHz, got {freq_float}") + return freq_float + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid frequency: {freq}") from e + + +def validate_device_index(device: Any) -> int: + """Validate and return RTL-SDR device index.""" + try: + device_int = int(device) + if not 0 <= device_int <= 255: + raise ValueError(f"Device index must be between 0 and 255, got {device_int}") + return device_int + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid device index: {device}") from e + + +def validate_gain(gain: Any) -> float: + """Validate and return gain value.""" + try: + gain_float = float(gain) + if not 0 <= gain_float <= 50: + raise ValueError(f"Gain must be between 0 and 50, got {gain_float}") + return gain_float + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid gain: {gain}") from e + + +def validate_ppm(ppm: Any) -> int: + """Validate and return PPM correction value.""" + try: + ppm_int = int(ppm) + if not -1000 <= ppm_int <= 1000: + raise ValueError(f"PPM must be between -1000 and 1000, got {ppm_int}") + return ppm_int + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid PPM: {ppm}") from e + + +def validate_hours(hours: Any, min_hours: int = 1, max_hours: int = 168) -> int: + """Validate and return hours value (for satellite predictions).""" + try: + hours_int = int(hours) + if not min_hours <= hours_int <= max_hours: + raise ValueError(f"Hours must be between {min_hours} and {max_hours}, got {hours_int}") + return hours_int + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid hours: {hours}") from e + + +def validate_elevation(elevation: Any) -> float: + """Validate and return elevation angle.""" + try: + el_float = float(elevation) + if not 0 <= el_float <= 90: + raise ValueError(f"Elevation must be between 0 and 90, got {el_float}") + return el_float + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid elevation: {elevation}") from e + + +def validate_wifi_channel(channel: Any) -> int: + """Validate and return WiFi channel.""" + try: + ch_int = int(channel) + # Valid WiFi channels: 1-14 (2.4GHz), 32-177 (5GHz) + valid_2ghz = 1 <= ch_int <= 14 + valid_5ghz = 32 <= ch_int <= 177 + if not (valid_2ghz or valid_5ghz): + raise ValueError(f"Invalid WiFi channel: {ch_int}") + return ch_int + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid WiFi channel: {channel}") from e + + +def validate_mac_address(mac: Any) -> str: + """Validate and return MAC address.""" + if not mac or not isinstance(mac, str): + raise ValueError("MAC address is required") + mac = mac.upper().strip() + if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', mac): + raise ValueError(f"Invalid MAC address format: {mac}") + return mac + + +def validate_positive_int(value: Any, name: str = 'value', max_val: int | None = None) -> int: + """Validate and return a positive integer.""" + try: + val_int = int(value) + if val_int < 0: + raise ValueError(f"{name} must be positive, got {val_int}") + if max_val is not None and val_int > max_val: + raise ValueError(f"{name} must be <= {max_val}, got {val_int}") + return val_int + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid {name}: {value}") from e + + +def sanitize_callsign(callsign: str | None) -> str: + """Sanitize aircraft callsign for display.""" + if not callsign: + return '' + # Only allow alphanumeric, dash, and space + return re.sub(r'[^A-Za-z0-9\- ]', '', str(callsign))[:10] + + +def sanitize_ssid(ssid: str | None) -> str: + """Sanitize WiFi SSID for display.""" + if not ssid: + return '' + # Escape HTML and limit length + return escape_html(str(ssid)[:64]) + + +def sanitize_device_name(name: str | None) -> str: + """Sanitize Bluetooth device name for display.""" + if not name: + return '' + # Escape HTML and limit length + return escape_html(str(name)[:64])