"""APRS amateur radio position reporting routes.""" from __future__ import annotations import csv import json import os import queue import re import shutil import subprocess import tempfile import threading import time from datetime import datetime from subprocess import DEVNULL, PIPE, STDOUT from typing import Generator, Optional from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import sensor_logger as logger from utils.validation import validate_device_index, validate_gain, validate_ppm from utils.sse import format_sse from utils.constants import ( PROCESS_TERMINATE_TIMEOUT, SSE_KEEPALIVE_INTERVAL, SSE_QUEUE_TIMEOUT, PROCESS_START_WAIT, ) aprs_bp = Blueprint('aprs', __name__, url_prefix='/aprs') # APRS frequencies by region (MHz) APRS_FREQUENCIES = { 'north_america': '144.390', 'europe': '144.800', 'uk': '144.800', 'australia': '145.175', 'new_zealand': '144.575', 'argentina': '144.930', 'brazil': '145.570', 'japan': '144.640', 'china': '144.640', } # Statistics aprs_packet_count = 0 aprs_station_count = 0 aprs_last_packet_time = None aprs_stations = {} # callsign -> station data # Meter rate limiting _last_meter_time = 0.0 _last_meter_level = -1 METER_MIN_INTERVAL = 0.1 # Max 10 updates/sec METER_MIN_CHANGE = 2 # Only send if level changes by at least this much def find_direwolf() -> Optional[str]: """Find direwolf binary.""" return shutil.which('direwolf') def find_multimon_ng() -> Optional[str]: """Find multimon-ng binary.""" return shutil.which('multimon-ng') def find_rtl_fm() -> Optional[str]: """Find rtl_fm binary.""" return shutil.which('rtl_fm') def find_rtl_power() -> Optional[str]: """Find rtl_power binary for spectrum scanning.""" return shutil.which('rtl_power') # Path to direwolf config file DIREWOLF_CONFIG_PATH = os.path.join(tempfile.gettempdir(), 'intercept_direwolf.conf') def create_direwolf_config() -> str: """Create a minimal direwolf config for receive-only operation.""" config = """# Minimal direwolf config for INTERCEPT (receive-only) # Audio input is handled via stdin ADEVICE stdin null CHANNEL 0 MYCALL N0CALL MODEM 1200 """ with open(DIREWOLF_CONFIG_PATH, 'w') as f: f.write(config) return DIREWOLF_CONFIG_PATH def parse_aprs_packet(raw_packet: str) -> Optional[dict]: """Parse APRS packet into structured data.""" try: # Basic APRS packet format: CALLSIGN>PATH:DATA # Example: N0CALL-9>APRS,TCPIP*:@092345z4903.50N/07201.75W_090/000g005t077 match = re.match(r'^([A-Z0-9-]+)>([^:]+):(.+)$', raw_packet, re.IGNORECASE) if not match: return None callsign = match.group(1).upper() path = match.group(2) data = match.group(3) packet = { 'type': 'aprs', 'callsign': callsign, 'path': path, 'raw': raw_packet, 'timestamp': datetime.utcnow().isoformat() + 'Z', } # Determine packet type and parse accordingly if data.startswith('!') or data.startswith('='): # Position without timestamp packet['packet_type'] = 'position' pos = parse_position(data[1:]) if pos: packet.update(pos) elif data.startswith('/') or data.startswith('@'): # Position with timestamp packet['packet_type'] = 'position' # Skip timestamp (7 chars) and parse position if len(data) > 8: pos = parse_position(data[8:]) if pos: packet.update(pos) elif data.startswith('>'): # Status message packet['packet_type'] = 'status' packet['status'] = data[1:] elif data.startswith(':'): # Message packet['packet_type'] = 'message' msg_match = re.match(r'^:([A-Z0-9 -]{9}):(.*)$', data, re.IGNORECASE) if msg_match: packet['addressee'] = msg_match.group(1).strip() packet['message'] = msg_match.group(2) elif data.startswith('_'): # Weather report (Positionless) packet['packet_type'] = 'weather' packet['weather'] = parse_weather(data) elif data.startswith(';'): # Object packet['packet_type'] = 'object' elif data.startswith(')'): # Item packet['packet_type'] = 'item' elif data.startswith('T'): # Telemetry packet['packet_type'] = 'telemetry' else: packet['packet_type'] = 'other' packet['data'] = data return packet except Exception as e: logger.debug(f"Failed to parse APRS packet: {e}") return None def parse_position(data: str) -> Optional[dict]: """Parse APRS position data.""" try: # Format: DDMM.mmN/DDDMM.mmW (or similar with symbols) # Example: 4903.50N/07201.75W pos_match = re.match( r'^(\d{2})(\d{2}\.\d+)([NS])(.)(\d{3})(\d{2}\.\d+)([EW])(.)?', data ) if pos_match: lat_deg = int(pos_match.group(1)) lat_min = float(pos_match.group(2)) lat_dir = pos_match.group(3) symbol_table = pos_match.group(4) lon_deg = int(pos_match.group(5)) lon_min = float(pos_match.group(6)) lon_dir = pos_match.group(7) symbol_code = pos_match.group(8) or '' lat = lat_deg + lat_min / 60.0 if lat_dir == 'S': lat = -lat lon = lon_deg + lon_min / 60.0 if lon_dir == 'W': lon = -lon result = { 'lat': round(lat, 6), 'lon': round(lon, 6), 'symbol': symbol_table + symbol_code, } # Parse additional data after position (course/speed, altitude, etc.) remaining = data[18:] if len(data) > 18 else '' # Course/Speed: CCC/SSS cs_match = re.search(r'(\d{3})/(\d{3})', remaining) if cs_match: result['course'] = int(cs_match.group(1)) result['speed'] = int(cs_match.group(2)) # knots # Altitude: /A=NNNNNN alt_match = re.search(r'/A=(-?\d+)', remaining) if alt_match: result['altitude'] = int(alt_match.group(1)) # feet return result except Exception as e: logger.debug(f"Failed to parse position: {e}") return None def parse_weather(data: str) -> dict: """Parse APRS weather data.""" weather = {} # Wind direction: cCCC match = re.search(r'c(\d{3})', data) if match: weather['wind_direction'] = int(match.group(1)) # Wind speed: sSSS (mph) match = re.search(r's(\d{3})', data) if match: weather['wind_speed'] = int(match.group(1)) # Wind gust: gGGG (mph) match = re.search(r'g(\d{3})', data) if match: weather['wind_gust'] = int(match.group(1)) # Temperature: tTTT (Fahrenheit) match = re.search(r't(-?\d{2,3})', data) if match: weather['temperature'] = int(match.group(1)) # Rain last hour: rRRR (hundredths of inch) match = re.search(r'r(\d{3})', data) if match: weather['rain_1h'] = int(match.group(1)) / 100.0 # Rain last 24h: pPPP match = re.search(r'p(\d{3})', data) if match: weather['rain_24h'] = int(match.group(1)) / 100.0 # Humidity: hHH (%) match = re.search(r'h(\d{2})', data) if match: h = int(match.group(1)) weather['humidity'] = 100 if h == 0 else h # Barometric pressure: bBBBBB (tenths of millibars) match = re.search(r'b(\d{5})', data) if match: weather['pressure'] = int(match.group(1)) / 10.0 return weather def parse_audio_level(line: str) -> Optional[int]: """Parse direwolf audio level line and return normalized level (0-100). Direwolf outputs lines like: Audio level = 34(18/16) [NONE] __||||||______ [0.4] Audio level = 57(34/32) [NONE] __||||||||||||______ The first number after "Audio level = " is the main level indicator. We normalize it to 0-100 scale (direwolf typically outputs 0-100+). """ # Match "Audio level = NN" pattern match = re.search(r'Audio level\s*=\s*(\d+)', line, re.IGNORECASE) if match: raw_level = int(match.group(1)) # Normalize: direwolf levels are typically 0-100, but can go higher # Clamp to 0-100 range normalized = min(max(raw_level, 0), 100) return normalized return None def should_send_meter_update(level: int) -> bool: """Rate-limit meter updates to avoid spamming SSE. Only send if: - At least METER_MIN_INTERVAL seconds have passed, OR - Level changed by at least METER_MIN_CHANGE """ global _last_meter_time, _last_meter_level now = time.time() time_ok = (now - _last_meter_time) >= METER_MIN_INTERVAL change_ok = abs(level - _last_meter_level) >= METER_MIN_CHANGE if time_ok or change_ok: _last_meter_time = now _last_meter_level = level return True return False def stream_aprs_output(rtl_process: subprocess.Popen, decoder_process: subprocess.Popen) -> None: """Stream decoded APRS packets and audio level meter to queue. This function reads from the decoder's stdout (text mode, line-buffered). The decoder's stderr is merged into stdout (STDOUT) to avoid deadlocks. rtl_fm's stderr is sent to DEVNULL for the same reason. Outputs two types of messages to the queue: - type='aprs': Decoded APRS packets - type='meter': Audio level meter readings (rate-limited) """ global aprs_packet_count, aprs_station_count, aprs_last_packet_time, aprs_stations global _last_meter_time, _last_meter_level # Reset meter state _last_meter_time = 0.0 _last_meter_level = -1 try: app_module.aprs_queue.put({'type': 'status', 'status': 'started'}) # Read line-by-line in text mode. Empty string '' signals EOF. for line in iter(decoder_process.stdout.readline, ''): line = line.strip() if not line: continue # Check for audio level line first (for signal meter) audio_level = parse_audio_level(line) if audio_level is not None: if should_send_meter_update(audio_level): meter_msg = { 'type': 'meter', 'level': audio_level, 'ts': datetime.utcnow().isoformat() + 'Z' } app_module.aprs_queue.put(meter_msg) continue # Audio level lines are not packets # multimon-ng prefixes decoded packets with "AFSK1200: " if line.startswith('AFSK1200:'): line = line[9:].strip() # direwolf often prefixes packets with "[0.4] " or similar audio level indicator # Strip any leading bracket prefix like "[0.4] " before parsing line = re.sub(r'^\[\d+\.\d+\]\s*', '', line) # Skip non-packet lines (APRS format: CALL>PATH:DATA) if '>' not in line or ':' not in line: continue packet = parse_aprs_packet(line) if packet: aprs_packet_count += 1 aprs_last_packet_time = time.time() # Track unique stations callsign = packet.get('callsign') if callsign and callsign not in aprs_stations: aprs_station_count += 1 # Update station data if callsign: aprs_stations[callsign] = { 'callsign': callsign, 'lat': packet.get('lat'), 'lon': packet.get('lon'), 'symbol': packet.get('symbol'), 'last_seen': packet.get('timestamp'), 'packet_type': packet.get('packet_type'), } app_module.aprs_queue.put(packet) # Log if enabled if app_module.logging_enabled: try: with open(app_module.log_file_path, 'a') as f: ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') f.write(f"{ts} | APRS | {json.dumps(packet)}\n") except Exception: pass except Exception as e: logger.error(f"APRS stream error: {e}") app_module.aprs_queue.put({'type': 'error', 'message': str(e)}) finally: app_module.aprs_queue.put({'type': 'status', 'status': 'stopped'}) # Cleanup processes for proc in [rtl_process, decoder_process]: try: proc.terminate() proc.wait(timeout=2) except Exception: try: proc.kill() except Exception: pass @aprs_bp.route('/tools') def check_aprs_tools() -> Response: """Check for APRS decoding tools.""" has_rtl_fm = find_rtl_fm() is not None has_direwolf = find_direwolf() is not None has_multimon = find_multimon_ng() is not None return jsonify({ 'rtl_fm': has_rtl_fm, 'direwolf': has_direwolf, 'multimon_ng': has_multimon, 'ready': has_rtl_fm and (has_direwolf or has_multimon), 'decoder': 'direwolf' if has_direwolf else ('multimon-ng' if has_multimon else None) }) @aprs_bp.route('/status') def aprs_status() -> Response: """Get APRS decoder status.""" running = False if app_module.aprs_process: running = app_module.aprs_process.poll() is None return jsonify({ 'running': running, 'packet_count': aprs_packet_count, 'station_count': aprs_station_count, 'last_packet_time': aprs_last_packet_time, 'queue_size': app_module.aprs_queue.qsize() }) @aprs_bp.route('/stations') def get_stations() -> Response: """Get all tracked APRS stations.""" return jsonify({ 'stations': list(aprs_stations.values()), 'count': len(aprs_stations) }) @aprs_bp.route('/start', methods=['POST']) def start_aprs() -> Response: """Start APRS decoder.""" global aprs_packet_count, aprs_station_count, aprs_last_packet_time, aprs_stations with app_module.aprs_lock: if app_module.aprs_process and app_module.aprs_process.poll() is None: return jsonify({ 'status': 'error', 'message': 'APRS decoder already running' }), 409 # Check for required tools rtl_fm_path = find_rtl_fm() if not rtl_fm_path: return jsonify({ 'status': 'error', 'message': 'rtl_fm not found. Install with: sudo apt install rtl-sdr' }), 400 # Check for decoder (prefer direwolf, fallback to multimon-ng) direwolf_path = find_direwolf() multimon_path = find_multimon_ng() if not direwolf_path and not multimon_path: return jsonify({ 'status': 'error', 'message': 'No APRS decoder found. Install direwolf or multimon-ng' }), 400 data = request.json or {} # Validate inputs try: device = validate_device_index(data.get('device', '0')) gain = validate_gain(data.get('gain', '40')) ppm = validate_ppm(data.get('ppm', '0')) except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 # Get frequency for region region = data.get('region', 'north_america') frequency = APRS_FREQUENCIES.get(region, '144.390') # Allow custom frequency override if data.get('frequency'): frequency = data.get('frequency') # Clear queue and reset stats while not app_module.aprs_queue.empty(): try: app_module.aprs_queue.get_nowait() except queue.Empty: break aprs_packet_count = 0 aprs_station_count = 0 aprs_last_packet_time = None aprs_stations = {} # Build rtl_fm command for APRS (narrowband FM at 22050 Hz for AFSK1200) freq_hz = f"{float(frequency)}M" rtl_cmd = [ rtl_fm_path, '-f', freq_hz, '-M', 'nfm', # Narrowband FM for APRS '-s', '22050', # Sample rate matching direwolf -r 22050 '-E', 'dc', # Enable DC blocking filter for cleaner audio '-A', 'fast', # Fast AGC for packet bursts '-d', str(device), ] # Gain: 0 means auto, otherwise set specific gain if gain and str(gain) != '0': rtl_cmd.extend(['-g', str(gain)]) # PPM frequency correction if ppm and str(ppm) != '0': rtl_cmd.extend(['-p', str(ppm)]) # Output raw audio to stdout rtl_cmd.append('-') # Build decoder command if direwolf_path: # Create minimal config file for direwolf config_path = create_direwolf_config() # direwolf flags for receiving AFSK1200 from stdin: # -c config = config file path (must come before other options) # -n 1 = mono audio channel # -r 22050 = sample rate (must match rtl_fm -s) # -b 16 = 16-bit signed samples # -t 0 = disable text colors (for cleaner parsing) # NOTE: We do NOT use -q h here so we get audio level lines for the signal meter # - = read audio from stdin (must be last argument) decoder_cmd = [ direwolf_path, '-c', config_path, '-n', '1', '-r', '22050', '-b', '16', '-t', '0', '-' ] decoder_name = 'direwolf' else: # Fallback to multimon-ng decoder_cmd = [multimon_path, '-t', 'raw', '-a', 'AFSK1200', '-'] decoder_name = 'multimon-ng' logger.info(f"Starting APRS decoder: {' '.join(rtl_cmd)} | {' '.join(decoder_cmd)}") try: # Start rtl_fm with stdout piped to decoder. # stderr goes to DEVNULL to prevent blocking (rtl_fm logs to stderr). # NOTE: RTL-SDR Blog V4 may show offset-tuned frequency in logs - this is normal. rtl_process = subprocess.Popen( rtl_cmd, stdout=PIPE, stderr=DEVNULL, start_new_session=True ) # Start decoder with stdin wired to rtl_fm's stdout. # Use text mode with line buffering for reliable line-by-line reading. # Merge stderr into stdout to avoid blocking on unbuffered stderr. decoder_process = subprocess.Popen( decoder_cmd, stdin=rtl_process.stdout, stdout=PIPE, stderr=STDOUT, text=True, bufsize=1, start_new_session=True ) # Close rtl_fm's stdout in parent so decoder owns it exclusively. # This ensures proper EOF propagation when rtl_fm terminates. rtl_process.stdout.close() # Wait briefly to check if processes started successfully time.sleep(PROCESS_START_WAIT) if rtl_process.poll() is not None: # rtl_fm exited early - something went wrong error_msg = f'rtl_fm failed to start (exit code {rtl_process.returncode})' logger.error(error_msg) try: decoder_process.kill() except Exception: pass return jsonify({'status': 'error', 'message': error_msg}), 500 if decoder_process.poll() is not None: # Decoder exited early - capture any output error_output = decoder_process.stdout.read()[:500] if decoder_process.stdout else '' error_msg = f'{decoder_name} failed to start' if error_output: error_msg += f': {error_output}' logger.error(error_msg) try: rtl_process.kill() except Exception: pass return jsonify({'status': 'error', 'message': error_msg}), 500 # Store references for status checks and cleanup app_module.aprs_process = decoder_process app_module.aprs_rtl_process = rtl_process # Start background thread to read decoder output and push to queue thread = threading.Thread( target=stream_aprs_output, args=(rtl_process, decoder_process), daemon=True ) thread.start() return jsonify({ 'status': 'started', 'frequency': frequency, 'region': region, 'device': device, 'decoder': decoder_name }) except Exception as e: logger.error(f"Failed to start APRS decoder: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @aprs_bp.route('/stop', methods=['POST']) def stop_aprs() -> Response: """Stop APRS decoder.""" with app_module.aprs_lock: processes_to_stop = [] if hasattr(app_module, 'aprs_rtl_process') and app_module.aprs_rtl_process: processes_to_stop.append(app_module.aprs_rtl_process) if app_module.aprs_process: processes_to_stop.append(app_module.aprs_process) if not processes_to_stop: return jsonify({ 'status': 'error', 'message': 'APRS decoder not running' }), 400 for proc in processes_to_stop: try: proc.terminate() proc.wait(timeout=PROCESS_TERMINATE_TIMEOUT) except subprocess.TimeoutExpired: proc.kill() except Exception as e: logger.error(f"Error stopping APRS process: {e}") app_module.aprs_process = None if hasattr(app_module, 'aprs_rtl_process'): app_module.aprs_rtl_process = None return jsonify({'status': 'stopped'}) @aprs_bp.route('/stream') def stream_aprs() -> Response: """SSE stream for APRS packets.""" def generate() -> Generator[str, None, None]: last_keepalive = time.time() while True: try: msg = app_module.aprs_queue.get(timeout=SSE_QUEUE_TIMEOUT) last_keepalive = time.time() yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: yield format_sse({'type': 'keepalive'}) last_keepalive = now response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' response.headers['X-Accel-Buffering'] = 'no' return response @aprs_bp.route('/frequencies') def get_frequencies() -> Response: """Get APRS frequencies by region.""" return jsonify(APRS_FREQUENCIES) @aprs_bp.route('/spectrum', methods=['GET', 'POST']) def scan_aprs_spectrum() -> Response: """Scan spectrum around APRS frequency for signal visibility debugging. This endpoint runs rtl_power briefly to detect signal activity near the APRS frequency. Useful for headless/remote debugging to verify antenna and SDR are receiving signals. Query params or JSON body: device: SDR device index (default: 0) gain: Gain in dB, 0=auto (default: 0) region: Region for frequency lookup (default: europe) frequency: Override frequency in MHz (optional) duration: Scan duration in seconds (default: 10, max: 60) Returns JSON with peak detection and signal analysis. """ rtl_power_path = find_rtl_power() if not rtl_power_path: return jsonify({ 'status': 'error', 'message': 'rtl_power not found. Install with: sudo apt install rtl-sdr' }), 400 # Get parameters from JSON body or query args if request.is_json: data = request.json or {} else: data = {} device = data.get('device', request.args.get('device', '0')) gain = data.get('gain', request.args.get('gain', '0')) region = data.get('region', request.args.get('region', 'europe')) frequency = data.get('frequency', request.args.get('frequency')) duration = data.get('duration', request.args.get('duration', '10')) # Validate inputs try: device = validate_device_index(device) gain = validate_gain(gain) duration = min(max(int(duration), 5), 60) # Clamp 5-60 seconds except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 # Get center frequency if frequency: center_freq_mhz = float(frequency) else: center_freq_mhz = float(APRS_FREQUENCIES.get(region, '144.800')) # Scan 20 kHz around center frequency (±10 kHz) start_freq_mhz = center_freq_mhz - 0.010 end_freq_mhz = center_freq_mhz + 0.010 bin_size_hz = 200 # 200 Hz bins for good resolution # Create temp file for rtl_power output tmp_file = os.path.join(tempfile.gettempdir(), f'intercept_rtl_power_{os.getpid()}.csv') try: # Build rtl_power command # Format: rtl_power -f start:end:bin_size -d device -g gain -i interval -e duration output_file rtl_power_cmd = [ rtl_power_path, '-f', f'{start_freq_mhz}M:{end_freq_mhz}M:{bin_size_hz}', '-d', str(device), '-i', '1', # 1 second integration '-e', f'{duration}s', ] # Gain: 0 means auto if gain and str(gain) != '0': rtl_power_cmd.extend(['-g', str(gain)]) rtl_power_cmd.append(tmp_file) logger.info(f"Running spectrum scan: {' '.join(rtl_power_cmd)}") # Run rtl_power with timeout result = subprocess.run( rtl_power_cmd, capture_output=True, text=True, timeout=duration + 15 # Allow extra time for startup/shutdown ) if result.returncode != 0: error_msg = result.stderr[:200] if result.stderr else f'Exit code {result.returncode}' return jsonify({ 'status': 'error', 'message': f'rtl_power failed: {error_msg}' }), 500 # Parse rtl_power CSV output # Format: date, time, start_hz, end_hz, step_hz, samples, db1, db2, db3, ... if not os.path.exists(tmp_file): return jsonify({ 'status': 'error', 'message': 'rtl_power did not produce output file' }), 500 bins = [] with open(tmp_file, 'r') as f: reader = csv.reader(f) for row in reader: if len(row) < 7: continue try: row_start_hz = float(row[2]) row_step_hz = float(row[4]) # dB values start at column 6 for i, db_str in enumerate(row[6:]): db_val = float(db_str.strip()) freq_hz = row_start_hz + (i * row_step_hz) bins.append({'freq_hz': freq_hz, 'db': db_val}) except (ValueError, IndexError): continue if not bins: return jsonify({ 'status': 'error', 'message': 'No spectrum data collected. Check SDR connection and antenna.' }), 500 # Calculate statistics db_values = [b['db'] for b in bins] avg_db = sum(db_values) / len(db_values) max_bin = max(bins, key=lambda x: x['db']) min_db = min(db_values) # Find peak near center frequency (within 5 kHz) center_hz = center_freq_mhz * 1e6 near_center_bins = [b for b in bins if abs(b['freq_hz'] - center_hz) < 5000] if near_center_bins: peak_near_center = max(near_center_bins, key=lambda x: x['db']) else: peak_near_center = max_bin # Signal analysis peak_above_noise = peak_near_center['db'] - avg_db signal_detected = peak_above_noise > 3 # 3 dB above noise floor # Generate advice if peak_above_noise < 1: advice = "No signal detected near APRS frequency. Check antenna connection and orientation." elif peak_above_noise < 3: advice = "Weak signal detected. Consider improving antenna or reducing noise sources." elif peak_above_noise < 6: advice = "Moderate signal detected. Decoding should work for strong stations." else: advice = "Good signal detected. Decoding should work well." return jsonify({ 'status': 'success', 'scan_params': { 'center_freq_mhz': center_freq_mhz, 'start_freq_mhz': start_freq_mhz, 'end_freq_mhz': end_freq_mhz, 'bin_size_hz': bin_size_hz, 'duration_seconds': duration, 'device': device, 'gain': gain, 'region': region, }, 'results': { 'total_bins': len(bins), 'noise_floor_db': round(avg_db, 1), 'min_db': round(min_db, 1), 'peak_freq_mhz': round(max_bin['freq_hz'] / 1e6, 6), 'peak_db': round(max_bin['db'], 1), 'peak_near_aprs_freq_mhz': round(peak_near_center['freq_hz'] / 1e6, 6), 'peak_near_aprs_db': round(peak_near_center['db'], 1), 'signal_above_noise_db': round(peak_above_noise, 1), 'signal_detected': signal_detected, }, 'advice': advice, }) except subprocess.TimeoutExpired: return jsonify({ 'status': 'error', 'message': f'Spectrum scan timed out after {duration + 15} seconds' }), 500 except Exception as e: logger.error(f"Spectrum scan error: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 finally: # Cleanup temp file try: if os.path.exists(tmp_file): os.remove(tmp_file) except Exception: pass