Files
intercept/routes/radiosonde.py
Smittix 5b06c57565 feat: add radiosonde weather balloon tracking mode
Integrate radiosonde_auto_rx for automatic weather balloon detection and
decoding on 400-406 MHz. Includes UDP telemetry parsing, Leaflet map with
altitude-colored markers and trajectory tracks, SDR device registry
integration, setup script installation, and Docker support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 10:46:33 +00:00

548 lines
16 KiB
Python

"""Radiosonde weather balloon tracking routes.
Uses radiosonde_auto_rx to automatically scan for and decode radiosonde
telemetry (position, altitude, temperature, humidity, pressure) on the
400-406 MHz band. Telemetry arrives as JSON over UDP.
"""
from __future__ import annotations
import json
import os
import queue
import shutil
import socket
import subprocess
import threading
import time
from typing import Any
from flask import Blueprint, Response, jsonify, request
import app as app_module
from utils.constants import (
MAX_RADIOSONDE_AGE_SECONDS,
PROCESS_TERMINATE_TIMEOUT,
RADIOSONDE_TERMINATE_TIMEOUT,
RADIOSONDE_UDP_PORT,
SSE_KEEPALIVE_INTERVAL,
SSE_QUEUE_TIMEOUT,
)
from utils.logging import get_logger
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.validation import validate_device_index, validate_gain
logger = get_logger('intercept.radiosonde')
radiosonde_bp = Blueprint('radiosonde', __name__, url_prefix='/radiosonde')
# Track radiosonde state
radiosonde_running = False
radiosonde_active_device: int | None = None
radiosonde_active_sdr_type: str | None = None
# Active balloon data: serial -> telemetry dict
radiosonde_balloons: dict[str, dict[str, Any]] = {}
_balloons_lock = threading.Lock()
# UDP listener socket reference (so /stop can close it)
_udp_socket: socket.socket | None = None
# Common installation paths for radiosonde_auto_rx
AUTO_RX_PATHS = [
'/opt/radiosonde_auto_rx/auto_rx/auto_rx.py',
'/usr/local/bin/radiosonde_auto_rx',
'/opt/auto_rx/auto_rx.py',
]
def find_auto_rx() -> str | None:
"""Find radiosonde_auto_rx script/binary."""
# Check PATH first
path = shutil.which('radiosonde_auto_rx')
if path:
return path
# Check common locations
for p in AUTO_RX_PATHS:
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
# Check for Python script (not executable but runnable)
for p in AUTO_RX_PATHS:
if os.path.isfile(p):
return p
return None
def generate_station_cfg(
freq_min: float = 400.0,
freq_max: float = 406.0,
gain: float = 40.0,
device_index: int = 0,
ppm: int = 0,
bias_t: bool = False,
udp_port: int = RADIOSONDE_UDP_PORT,
) -> str:
"""Generate a station.cfg for radiosonde_auto_rx and return the file path."""
cfg_dir = os.path.join('data', 'radiosonde')
os.makedirs(cfg_dir, exist_ok=True)
cfg_path = os.path.join(cfg_dir, 'station.cfg')
# Minimal station.cfg that auto_rx needs
cfg = f"""# Auto-generated by INTERCEPT for radiosonde_auto_rx
[search_params]
min_freq = {freq_min}
max_freq = {freq_max}
rx_timeout = 180
whitelist = []
blacklist = []
greylist = []
[sdr]
sdr_type = rtlsdr
rtlsdr_device_idx = {device_index}
rtlsdr_gain = {gain}
rtlsdr_ppm = {ppm}
rtlsdr_bias = {str(bias_t).lower()}
[habitat]
upload_enabled = False
[aprs]
upload_enabled = False
[sondehub]
upload_enabled = False
[positioning]
station_lat = 0.0
station_lon = 0.0
station_alt = 0.0
[logging]
per_sonde_log = True
log_directory = ./data/radiosonde/logs
[advanced]
web_host = 127.0.0.1
web_port = 0
udp_broadcast_port = {udp_port}
"""
with open(cfg_path, 'w') as f:
f.write(cfg)
logger.info(f"Generated station.cfg at {cfg_path}")
return cfg_path
def parse_radiosonde_udp(udp_port: int) -> None:
"""Thread function: listen for radiosonde_auto_rx UDP JSON telemetry."""
global radiosonde_running, _udp_socket
logger.info(f"Radiosonde UDP listener started on port {udp_port}")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', udp_port))
sock.settimeout(2.0)
_udp_socket = sock
except OSError as e:
logger.error(f"Failed to bind UDP port {udp_port}: {e}")
return
while radiosonde_running:
try:
data, _addr = sock.recvfrom(4096)
except socket.timeout:
# Clean up stale balloons
_cleanup_stale_balloons()
continue
except OSError:
break
try:
msg = json.loads(data.decode('utf-8', errors='ignore'))
except (json.JSONDecodeError, UnicodeDecodeError):
continue
balloon = _process_telemetry(msg)
if balloon:
serial = balloon.get('id', '')
if serial:
with _balloons_lock:
radiosonde_balloons[serial] = balloon
try:
app_module.radiosonde_queue.put_nowait({
'type': 'balloon',
**balloon,
})
except queue.Full:
pass
try:
sock.close()
except OSError:
pass
_udp_socket = None
logger.info("Radiosonde UDP listener stopped")
def _process_telemetry(msg: dict) -> dict | None:
"""Extract relevant fields from a radiosonde_auto_rx UDP telemetry packet."""
# auto_rx broadcasts packets with a 'type' field
# Telemetry packets have type 'payload_summary' or individual sonde data
serial = msg.get('id') or msg.get('serial')
if not serial:
return None
balloon: dict[str, Any] = {'id': str(serial)}
# Sonde type (RS41, RS92, DFM, M10, etc.)
if 'type' in msg:
balloon['sonde_type'] = msg['type']
if 'subtype' in msg:
balloon['sonde_type'] = msg['subtype']
# Timestamp
if 'datetime' in msg:
balloon['datetime'] = msg['datetime']
# Position
for key in ('lat', 'latitude'):
if key in msg:
try:
balloon['lat'] = float(msg[key])
except (ValueError, TypeError):
pass
break
for key in ('lon', 'longitude'):
if key in msg:
try:
balloon['lon'] = float(msg[key])
except (ValueError, TypeError):
pass
break
# Altitude (metres)
if 'alt' in msg:
try:
balloon['alt'] = float(msg['alt'])
except (ValueError, TypeError):
pass
# Meteorological data
for field in ('temp', 'humidity', 'pressure'):
if field in msg:
try:
balloon[field] = float(msg[field])
except (ValueError, TypeError):
pass
# Velocity
if 'vel_h' in msg:
try:
balloon['vel_h'] = float(msg['vel_h'])
except (ValueError, TypeError):
pass
if 'vel_v' in msg:
try:
balloon['vel_v'] = float(msg['vel_v'])
except (ValueError, TypeError):
pass
if 'heading' in msg:
try:
balloon['heading'] = float(msg['heading'])
except (ValueError, TypeError):
pass
# GPS satellites
if 'sats' in msg:
try:
balloon['sats'] = int(msg['sats'])
except (ValueError, TypeError):
pass
# Battery voltage
if 'batt' in msg:
try:
balloon['batt'] = float(msg['batt'])
except (ValueError, TypeError):
pass
# Frequency
if 'freq' in msg:
try:
balloon['freq'] = float(msg['freq'])
except (ValueError, TypeError):
pass
balloon['last_seen'] = time.time()
return balloon
def _cleanup_stale_balloons() -> None:
"""Remove balloons not seen within the retention window."""
now = time.time()
with _balloons_lock:
stale = [
k for k, v in radiosonde_balloons.items()
if now - v.get('last_seen', 0) > MAX_RADIOSONDE_AGE_SECONDS
]
for k in stale:
del radiosonde_balloons[k]
@radiosonde_bp.route('/tools')
def check_tools():
"""Check for radiosonde decoding tools and hardware."""
auto_rx_path = find_auto_rx()
devices = SDRFactory.detect_devices()
has_rtlsdr = any(d.sdr_type == SDRType.RTL_SDR for d in devices)
return jsonify({
'auto_rx': auto_rx_path is not None,
'auto_rx_path': auto_rx_path,
'has_rtlsdr': has_rtlsdr,
'device_count': len(devices),
})
@radiosonde_bp.route('/status')
def radiosonde_status():
"""Get radiosonde tracking status."""
process_running = False
if app_module.radiosonde_process:
process_running = app_module.radiosonde_process.poll() is None
with _balloons_lock:
balloon_count = len(radiosonde_balloons)
balloons_snapshot = dict(radiosonde_balloons)
return jsonify({
'tracking_active': radiosonde_running,
'active_device': radiosonde_active_device,
'balloon_count': balloon_count,
'balloons': balloons_snapshot,
'queue_size': app_module.radiosonde_queue.qsize(),
'auto_rx_path': find_auto_rx(),
'process_running': process_running,
})
@radiosonde_bp.route('/start', methods=['POST'])
def start_radiosonde():
"""Start radiosonde tracking."""
global radiosonde_running, radiosonde_active_device, radiosonde_active_sdr_type
with app_module.radiosonde_lock:
if radiosonde_running:
return jsonify({
'status': 'already_running',
'message': 'Radiosonde tracking already active',
}), 409
data = request.json or {}
# Validate inputs
try:
gain = float(validate_gain(data.get('gain', '40')))
device = validate_device_index(data.get('device', '0'))
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
freq_min = data.get('freq_min', 400.0)
freq_max = data.get('freq_max', 406.0)
try:
freq_min = float(freq_min)
freq_max = float(freq_max)
if not (380.0 <= freq_min <= 410.0) or not (380.0 <= freq_max <= 410.0):
raise ValueError("Frequency out of range")
if freq_min >= freq_max:
raise ValueError("Min frequency must be less than max")
except (ValueError, TypeError) as e:
return jsonify({'status': 'error', 'message': f'Invalid frequency range: {e}'}), 400
bias_t = data.get('bias_t', False)
ppm = int(data.get('ppm', 0))
# Find auto_rx
auto_rx_path = find_auto_rx()
if not auto_rx_path:
return jsonify({
'status': 'error',
'message': 'radiosonde_auto_rx not found. Install from https://github.com/projecthorus/radiosonde_auto_rx',
}), 400
# Get SDR type
sdr_type_str = data.get('sdr_type', 'rtlsdr')
# Kill any existing process
if app_module.radiosonde_process:
try:
pgid = os.getpgid(app_module.radiosonde_process.pid)
os.killpg(pgid, 15)
app_module.radiosonde_process.wait(timeout=PROCESS_TERMINATE_TIMEOUT)
except (subprocess.TimeoutExpired, ProcessLookupError, OSError):
try:
pgid = os.getpgid(app_module.radiosonde_process.pid)
os.killpg(pgid, 9)
except (ProcessLookupError, OSError):
pass
app_module.radiosonde_process = None
logger.info("Killed existing radiosonde process")
# Claim SDR device
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'radiosonde', sdr_type_str)
if error:
return jsonify({
'status': 'error',
'error_type': 'DEVICE_BUSY',
'message': error,
}), 409
# Generate config
cfg_path = generate_station_cfg(
freq_min=freq_min,
freq_max=freq_max,
gain=gain,
device_index=device_int,
ppm=ppm,
bias_t=bias_t,
)
# Build command
cfg_dir = os.path.dirname(os.path.abspath(cfg_path))
if auto_rx_path.endswith('.py'):
cmd = ['python', auto_rx_path, '-c', cfg_dir]
else:
cmd = [auto_rx_path, '-c', cfg_dir]
try:
logger.info(f"Starting radiosonde_auto_rx: {' '.join(cmd)}")
app_module.radiosonde_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
start_new_session=True,
)
# Wait briefly for process to start
time.sleep(2.0)
if app_module.radiosonde_process.poll() is not None:
app_module.release_sdr_device(device_int, sdr_type_str)
stderr_output = ''
if app_module.radiosonde_process.stderr:
try:
stderr_output = app_module.radiosonde_process.stderr.read().decode(
'utf-8', errors='ignore'
).strip()
except Exception:
pass
error_msg = 'radiosonde_auto_rx failed to start. Check SDR device connection.'
if stderr_output:
error_msg += f' Error: {stderr_output[:200]}'
return jsonify({'status': 'error', 'message': error_msg}), 500
radiosonde_running = True
radiosonde_active_device = device_int
radiosonde_active_sdr_type = sdr_type_str
# Clear stale data
with _balloons_lock:
radiosonde_balloons.clear()
# Start UDP listener thread
udp_thread = threading.Thread(
target=parse_radiosonde_udp,
args=(RADIOSONDE_UDP_PORT,),
daemon=True,
)
udp_thread.start()
return jsonify({
'status': 'started',
'message': 'Radiosonde tracking started',
'device': device,
})
except Exception as e:
app_module.release_sdr_device(device_int, sdr_type_str)
logger.error(f"Failed to start radiosonde_auto_rx: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@radiosonde_bp.route('/stop', methods=['POST'])
def stop_radiosonde():
"""Stop radiosonde tracking."""
global radiosonde_running, radiosonde_active_device, radiosonde_active_sdr_type, _udp_socket
with app_module.radiosonde_lock:
if app_module.radiosonde_process:
try:
pgid = os.getpgid(app_module.radiosonde_process.pid)
os.killpg(pgid, 15)
app_module.radiosonde_process.wait(timeout=RADIOSONDE_TERMINATE_TIMEOUT)
except (subprocess.TimeoutExpired, ProcessLookupError, OSError):
try:
pgid = os.getpgid(app_module.radiosonde_process.pid)
os.killpg(pgid, 9)
except (ProcessLookupError, OSError):
pass
app_module.radiosonde_process = None
logger.info("Radiosonde process stopped")
# Close UDP socket to unblock listener thread
if _udp_socket:
try:
_udp_socket.close()
except OSError:
pass
_udp_socket = None
# Release SDR device
if radiosonde_active_device is not None:
app_module.release_sdr_device(
radiosonde_active_device,
radiosonde_active_sdr_type or 'rtlsdr',
)
radiosonde_running = False
radiosonde_active_device = None
radiosonde_active_sdr_type = None
with _balloons_lock:
radiosonde_balloons.clear()
return jsonify({'status': 'stopped'})
@radiosonde_bp.route('/stream')
def stream_radiosonde():
"""SSE stream for radiosonde telemetry."""
response = Response(
sse_stream_fanout(
source_queue=app_module.radiosonde_queue,
channel_key='radiosonde',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
@radiosonde_bp.route('/balloons')
def get_balloons():
"""Get current balloon data."""
with _balloons_lock:
return jsonify({
'status': 'success',
'count': len(radiosonde_balloons),
'balloons': dict(radiosonde_balloons),
})