"""VDL2 aircraft datalink routes.""" from __future__ import annotations import json import queue import shutil import subprocess import threading import time from datetime import datetime from typing import Generator from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import sensor_logger as logger from utils.validation import validate_device_index, validate_gain, validate_ppm from utils.sdr import SDRFactory, SDRType from utils.sse import format_sse from utils.event_pipeline import process_event from utils.constants import ( PROCESS_TERMINATE_TIMEOUT, SSE_KEEPALIVE_INTERVAL, SSE_QUEUE_TIMEOUT, PROCESS_START_WAIT, ) from utils.process import register_process, unregister_process vdl2_bp = Blueprint('vdl2', __name__, url_prefix='/vdl2') # Default VDL2 frequencies (MHz) - common worldwide DEFAULT_VDL2_FREQUENCIES = [ '136975000', # Primary worldwide '136725000', # Europe '136775000', # Europe '136800000', # Multi-region '136875000', # Multi-region ] # Message counter for statistics vdl2_message_count = 0 vdl2_last_message_time = None # Track which device is being used vdl2_active_device: int | None = None def find_dumpvdl2(): """Find dumpvdl2 binary.""" return shutil.which('dumpvdl2') def stream_vdl2_output(process: subprocess.Popen) -> None: """Stream dumpvdl2 JSON output to queue.""" global vdl2_message_count, vdl2_last_message_time try: app_module.vdl2_queue.put({'type': 'status', 'status': 'started'}) for line in iter(process.stdout.readline, b''): line = line.decode('utf-8', errors='replace').strip() if not line: continue try: data = json.loads(line) # Add our metadata data['type'] = 'vdl2' data['timestamp'] = datetime.utcnow().isoformat() + 'Z' # Update stats vdl2_message_count += 1 vdl2_last_message_time = time.time() app_module.vdl2_queue.put(data) # Log if enabled if app_module.logging_enabled: try: with open(app_module.log_file_path, 'a') as f: ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') f.write(f"{ts} | VDL2 | {json.dumps(data)}\n") except Exception: pass except json.JSONDecodeError: # Not JSON - could be status message if line: logger.debug(f"dumpvdl2 non-JSON: {line[:100]}") except Exception as e: logger.error(f"VDL2 stream error: {e}") app_module.vdl2_queue.put({'type': 'error', 'message': str(e)}) finally: global vdl2_active_device # Ensure process is terminated try: process.terminate() process.wait(timeout=2) except Exception: try: process.kill() except Exception: pass unregister_process(process) app_module.vdl2_queue.put({'type': 'status', 'status': 'stopped'}) with app_module.vdl2_lock: app_module.vdl2_process = None # Release SDR device if vdl2_active_device is not None: app_module.release_sdr_device(vdl2_active_device) vdl2_active_device = None @vdl2_bp.route('/tools') def check_vdl2_tools() -> Response: """Check for VDL2 decoding tools.""" has_dumpvdl2 = find_dumpvdl2() is not None return jsonify({ 'dumpvdl2': has_dumpvdl2, 'ready': has_dumpvdl2 }) @vdl2_bp.route('/status') def vdl2_status() -> Response: """Get VDL2 decoder status.""" running = False if app_module.vdl2_process: running = app_module.vdl2_process.poll() is None return jsonify({ 'running': running, 'message_count': vdl2_message_count, 'last_message_time': vdl2_last_message_time, 'queue_size': app_module.vdl2_queue.qsize() }) @vdl2_bp.route('/start', methods=['POST']) def start_vdl2() -> Response: """Start VDL2 decoder.""" global vdl2_message_count, vdl2_last_message_time, vdl2_active_device with app_module.vdl2_lock: if app_module.vdl2_process and app_module.vdl2_process.poll() is None: return jsonify({ 'status': 'error', 'message': 'VDL2 decoder already running' }), 409 # Check for dumpvdl2 dumpvdl2_path = find_dumpvdl2() if not dumpvdl2_path: return jsonify({ 'status': 'error', 'message': 'dumpvdl2 not found. Install from: https://github.com/szpajder/dumpvdl2' }), 400 data = request.json or {} # Validate inputs try: device = validate_device_index(data.get('device', '0')) gain = validate_gain(data.get('gain', '40')) ppm = validate_ppm(data.get('ppm', '0')) except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 # Check if device is available device_int = int(device) error = app_module.claim_sdr_device(device_int, 'vdl2') if error: return jsonify({ 'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error }), 409 vdl2_active_device = device_int # Get frequencies - use provided or defaults # dumpvdl2 expects frequencies in Hz (integers) frequencies = data.get('frequencies', DEFAULT_VDL2_FREQUENCIES) if isinstance(frequencies, str): frequencies = [f.strip() for f in frequencies.split(',')] # Clear queue while not app_module.vdl2_queue.empty(): try: app_module.vdl2_queue.get_nowait() except queue.Empty: break # Reset stats vdl2_message_count = 0 vdl2_last_message_time = None # Resolve SDR type for device selection sdr_type_str = data.get('sdr_type', 'rtlsdr') try: sdr_type = SDRType(sdr_type_str) except ValueError: sdr_type = SDRType.RTL_SDR is_soapy = sdr_type not in (SDRType.RTL_SDR,) # Build dumpvdl2 command # dumpvdl2 --output decoded:json --rtlsdr --gain --correction ... cmd = [dumpvdl2_path] cmd.extend(['--output', 'decoded:json:file:path=-']) if is_soapy: # SoapySDR device sdr_device = SDRFactory.create_default_device(sdr_type, index=device_int) builder = SDRFactory.get_builder(sdr_type) device_str = builder._build_device_string(sdr_device) cmd.extend(['--soapysdr', device_str]) else: cmd.extend(['--rtlsdr', str(device)]) # Add gain if gain and str(gain) != '0': cmd.extend(['--gain', str(gain)]) # Add PPM correction if specified if ppm and str(ppm) != '0': cmd.extend(['--correction', str(ppm)]) # Add frequencies (dumpvdl2 takes them as positional args in Hz) cmd.extend(frequencies) logger.info(f"Starting VDL2 decoder: {' '.join(cmd)}") try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True ) # Wait briefly to check if process started time.sleep(PROCESS_START_WAIT) if process.poll() is not None: # Process died - release device if vdl2_active_device is not None: app_module.release_sdr_device(vdl2_active_device) vdl2_active_device = None stderr = '' if process.stderr: stderr = process.stderr.read().decode('utf-8', errors='replace') error_msg = 'dumpvdl2 failed to start' if stderr: error_msg += f': {stderr[:200]}' logger.error(error_msg) return jsonify({'status': 'error', 'message': error_msg}), 500 app_module.vdl2_process = process register_process(process) # Start output streaming thread thread = threading.Thread( target=stream_vdl2_output, args=(process,), daemon=True ) thread.start() return jsonify({ 'status': 'started', 'frequencies': frequencies, 'device': device, 'gain': gain }) except Exception as e: # Release device on failure if vdl2_active_device is not None: app_module.release_sdr_device(vdl2_active_device) vdl2_active_device = None logger.error(f"Failed to start VDL2 decoder: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @vdl2_bp.route('/stop', methods=['POST']) def stop_vdl2() -> Response: """Stop VDL2 decoder.""" global vdl2_active_device with app_module.vdl2_lock: if not app_module.vdl2_process: return jsonify({ 'status': 'error', 'message': 'VDL2 decoder not running' }), 400 try: app_module.vdl2_process.terminate() app_module.vdl2_process.wait(timeout=PROCESS_TERMINATE_TIMEOUT) except subprocess.TimeoutExpired: app_module.vdl2_process.kill() except Exception as e: logger.error(f"Error stopping VDL2: {e}") app_module.vdl2_process = None # Release device from registry if vdl2_active_device is not None: app_module.release_sdr_device(vdl2_active_device) vdl2_active_device = None return jsonify({'status': 'stopped'}) @vdl2_bp.route('/stream') def stream_vdl2() -> Response: """SSE stream for VDL2 messages.""" def generate() -> Generator[str, None, None]: last_keepalive = time.time() while True: try: msg = app_module.vdl2_queue.get(timeout=SSE_QUEUE_TIMEOUT) last_keepalive = time.time() try: process_event('vdl2', msg, msg.get('type')) except Exception: pass yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: yield format_sse({'type': 'keepalive'}) last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' response.headers['X-Accel-Buffering'] = 'no' return response @vdl2_bp.route('/frequencies') def get_frequencies() -> Response: """Get default VDL2 frequencies.""" return jsonify({ 'default': DEFAULT_VDL2_FREQUENCIES, 'regions': { 'north_america': ['136975000', '136100000', '136650000', '136700000', '136800000'], 'europe': ['136975000', '136675000', '136725000', '136775000', '136825000'], 'asia_pacific': ['136975000', '136900000'], } })