Fix remote VDL2 streaming path and improve decoder reliability

This commit is contained in:
Smittix
2026-02-19 15:57:13 +00:00
parent bbc25ddaa0
commit 06a00ca6b5
3 changed files with 81 additions and 101 deletions

View File

@@ -1,13 +1,17 @@
"""VDL2 aircraft datalink routes."""
from __future__ import annotations
import json
import queue
import shutil
import subprocess
import threading
import time
from __future__ import annotations
import io
import json
import os
import platform
import pty
import queue
import shutil
import subprocess
import threading
import time
from datetime import datetime
from typing import Generator
@@ -51,17 +55,22 @@ def find_dumpvdl2():
return shutil.which('dumpvdl2')
def stream_vdl2_output(process: subprocess.Popen) -> None:
"""Stream dumpvdl2 JSON output to queue."""
global vdl2_message_count, vdl2_last_message_time
try:
app_module.vdl2_queue.put({'type': 'status', 'status': 'started'})
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> None:
"""Stream dumpvdl2 JSON output to queue."""
global vdl2_message_count, vdl2_last_message_time
try:
app_module.vdl2_queue.put({'type': 'status', 'status': 'started'})
# Use appropriate sentinel based on mode (text mode for pty on macOS)
sentinel = '' if is_text_mode else b''
for line in iter(process.stdout.readline, sentinel):
if is_text_mode:
line = line.strip()
else:
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
try:
data = json.loads(line)
@@ -243,12 +252,28 @@ def start_vdl2() -> Response:
logger.info(f"Starting VDL2 decoder: {' '.join(cmd)}")
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
is_text_mode = False
# On macOS, use pty to avoid stdout buffering issues
if platform.system() == 'Darwin':
master_fd, slave_fd = pty.openpty()
process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=subprocess.PIPE,
start_new_session=True
)
os.close(slave_fd)
# Wrap master_fd as a text file for line-buffered reading
process.stdout = io.open(master_fd, 'r', buffering=1)
is_text_mode = True
else:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
# Wait briefly to check if process started
time.sleep(PROCESS_START_WAIT)
@@ -270,12 +295,12 @@ def start_vdl2() -> Response:
app_module.vdl2_process = process
register_process(process)
# Start output streaming thread
thread = threading.Thread(
target=stream_vdl2_output,
args=(process,),
daemon=True
)
# Start output streaming thread
thread = threading.Thread(
target=stream_vdl2_output,
args=(process, is_text_mode),
daemon=True
)
thread.start()
return jsonify({

View File

@@ -485,7 +485,7 @@ async function syncLocalModeStates() {
*/
function showAgentModeWarnings(runningModes, modesDetail = {}) {
// SDR modes that can't run simultaneously on same device
const sdrModes = ['sensor', 'pager', 'adsb', 'ais', 'acars', 'aprs', 'rtlamr', 'listening_post', 'tscm', 'dsc'];
const sdrModes = ['sensor', 'pager', 'adsb', 'ais', 'acars', 'vdl2', 'aprs', 'rtlamr', 'listening_post', 'tscm', 'dsc'];
const runningSdrModes = runningModes.filter(m => sdrModes.includes(m));
let warning = document.getElementById('agentModeWarning');
@@ -621,7 +621,7 @@ function checkAgentModeConflict(modeToStart, deviceToUse = null) {
return false;
}
const sdrModes = ['sensor', 'pager', 'adsb', 'ais', 'acars', 'aprs', 'rtlamr', 'listening_post', 'tscm', 'dsc'];
const sdrModes = ['sensor', 'pager', 'adsb', 'ais', 'acars', 'vdl2', 'aprs', 'rtlamr', 'listening_post', 'tscm', 'dsc'];
// If we're trying to start an SDR mode
if (sdrModes.includes(modeToStart)) {

View File

@@ -3879,36 +3879,37 @@ sudo make install</code>
function startVdl2Stream(isAgentMode = false) {
if (vdl2EventSource) vdl2EventSource.close();
// Use different stream endpoint for agent mode
const streamUrl = isAgentMode ? '/controller/stream/all' : '/vdl2/stream';
// For remote agent mode, stream directly from the selected agent via controller proxy.
// This does not depend on push ingestion being enabled.
const streamUrl = isAgentMode && vdl2CurrentAgent !== null
? `/controller/agents/${vdl2CurrentAgent}/vdl2/stream`
: '/vdl2/stream';
vdl2EventSource = new EventSource(streamUrl);
vdl2EventSource.onmessage = function(e) {
const data = JSON.parse(e.data);
let message = null;
if (isAgentMode) {
// Handle multi-agent stream format
if (data.scan_type === 'vdl2' && data.payload) {
const payload = data.payload;
if (payload.type === 'vdl2') {
vdl2MessageCount++;
if (typeof stats !== 'undefined') stats.vdl2Messages = (stats.vdl2Messages || 0) + 1;
document.getElementById('vdl2Count').textContent = vdl2MessageCount;
document.getElementById('stripVdl2').textContent = vdl2MessageCount;
payload.agent_name = data.agent_name;
addVdl2Message(payload);
}
// Backward compatibility: handle wrapped multi-agent payloads if encountered.
if (data.scan_type === 'vdl2' && data.payload && data.payload.type === 'vdl2') {
message = data.payload;
if (isAgentMode) {
message.agent_name = data.agent_name || message.agent_name || 'Remote Agent';
}
} else {
// Local stream format
if (data.type === 'vdl2') {
vdl2MessageCount++;
if (typeof stats !== 'undefined') stats.vdl2Messages = (stats.vdl2Messages || 0) + 1;
document.getElementById('vdl2Count').textContent = vdl2MessageCount;
document.getElementById('stripVdl2').textContent = vdl2MessageCount;
addVdl2Message(data);
} else if (data.type === 'vdl2') {
message = data;
if (isAgentMode && !message.agent_name) {
message.agent_name = 'Remote Agent';
}
}
if (message) {
vdl2MessageCount++;
if (typeof stats !== 'undefined') stats.vdl2Messages = (stats.vdl2Messages || 0) + 1;
document.getElementById('vdl2Count').textContent = vdl2MessageCount;
document.getElementById('stripVdl2').textContent = vdl2MessageCount;
addVdl2Message(message);
}
};
vdl2EventSource.onerror = function() {
@@ -3919,52 +3920,6 @@ sudo make install</code>
}
}, 2000);
};
// Start polling fallback for agent mode
if (isAgentMode) {
startVdl2Polling();
}
}
// Track last VDL2 message count for polling
let lastVdl2MessageCount = 0;
function startVdl2Polling() {
if (vdl2PollTimer) return;
lastVdl2MessageCount = 0;
const pollInterval = 2000;
vdl2PollTimer = setInterval(async () => {
if (!isVdl2Running || !vdl2CurrentAgent) {
clearInterval(vdl2PollTimer);
vdl2PollTimer = null;
return;
}
try {
const response = await fetch(`/controller/agents/${vdl2CurrentAgent}/vdl2/data`);
if (!response.ok) return;
const data = await response.json();
const result = data.result || data;
const messages = result.data || [];
if (messages.length > lastVdl2MessageCount) {
const newMessages = messages.slice(lastVdl2MessageCount);
newMessages.forEach(msg => {
vdl2MessageCount++;
if (typeof stats !== 'undefined') stats.vdl2Messages = (stats.vdl2Messages || 0) + 1;
document.getElementById('vdl2Count').textContent = vdl2MessageCount;
document.getElementById('stripVdl2').textContent = vdl2MessageCount;
msg.agent_name = result.agent_name || 'Remote Agent';
addVdl2Message(msg);
});
lastVdl2MessageCount = messages.length;
}
} catch (err) {
console.error('VDL2 polling error:', err);
}
}, pollInterval);
}
function escapeHtml(str) {