fix: APRS 15-minute startup delay caused by pipe buffering

Switch direwolf subprocess output from PIPE to PTY (pseudo-terminal),
forcing line-buffered output so packets arrive immediately instead of
waiting for a 4-8KB pipe buffer to fill. Matches the proven pattern
used by pager mode.

Also enhances direwolf config with FIX_BITS error correction and
disables unused AGWPE/KISS server ports.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-27 08:43:54 +00:00
parent f9dc54cc3b
commit 0d13638d70

View File

@@ -5,8 +5,10 @@ from __future__ import annotations
import csv
import json
import os
import pty
import queue
import re
import select
import shutil
import subprocess
import tempfile
@@ -103,6 +105,9 @@ ADEVICE stdin null
CHANNEL 0
MYCALL N0CALL
MODEM 1200
FIX_BITS 1
AGWPORT 0
KISSPORT 0
"""
with open(DIREWOLF_CONFIG_PATH, 'w') as f:
f.write(config)
@@ -1437,12 +1442,12 @@ def should_send_meter_update(level: int) -> bool:
return False
def stream_aprs_output(rtl_process: subprocess.Popen, decoder_process: subprocess.Popen) -> None:
def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_process: subprocess.Popen) -> None:
"""Stream decoded APRS packets and audio level meter to queue.
This function reads from the decoder's stdout (text mode, line-buffered).
The decoder's stderr is merged into stdout (STDOUT) to avoid deadlocks.
rtl_fm's stderr is captured via PIPE with a monitor thread.
Reads from a PTY master fd to get line-buffered output from the decoder,
avoiding the 15-minute pipe buffering delay. Uses select() + os.read()
to poll the PTY (same pattern as pager.py).
Outputs two types of messages to the queue:
- type='aprs': Decoded APRS packets
@@ -1462,93 +1467,114 @@ def stream_aprs_output(rtl_process: subprocess.Popen, decoder_process: subproces
try:
app_module.aprs_queue.put({'type': 'status', 'status': 'started'})
# Read line-by-line in binary mode. Empty bytes b'' signals EOF.
# Decode with errors='replace' so corrupted radio bytes (e.g. 0xf7)
# never crash the stream.
for raw in iter(decoder_process.stdout.readline, b''):
line = raw.decode('utf-8', errors='replace').strip()
if not line:
continue
# Read from PTY using select() for non-blocking reads.
# PTY forces the decoder to line-buffer, so output arrives immediately
# instead of waiting for a full 4-8KB pipe buffer to fill.
buffer = ""
while True:
try:
ready, _, _ = select.select([master_fd], [], [], 1.0)
except Exception:
break
# Check for audio level line first (for signal meter)
audio_level = parse_audio_level(line)
if audio_level is not None:
if should_send_meter_update(audio_level):
meter_msg = {
'type': 'meter',
'level': audio_level,
'ts': datetime.utcnow().isoformat() + 'Z'
}
app_module.aprs_queue.put(meter_msg)
continue # Audio level lines are not packets
if ready:
try:
data = os.read(master_fd, 1024)
if not data:
break
buffer += data.decode('utf-8', errors='replace')
except OSError:
break
# Normalize decoder prefixes (multimon/direwolf) before parsing.
line = normalize_aprs_output_line(line)
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line:
continue
# Skip non-packet lines (APRS format: CALL>PATH:DATA)
if '>' not in line or ':' not in line:
continue
# Check for audio level line first (for signal meter)
audio_level = parse_audio_level(line)
if audio_level is not None:
if should_send_meter_update(audio_level):
meter_msg = {
'type': 'meter',
'level': audio_level,
'ts': datetime.utcnow().isoformat() + 'Z'
}
app_module.aprs_queue.put(meter_msg)
continue # Audio level lines are not packets
packet = parse_aprs_packet(line)
if packet:
aprs_packet_count += 1
aprs_last_packet_time = time.time()
# Normalize decoder prefixes (multimon/direwolf) before parsing.
line = normalize_aprs_output_line(line)
# Track unique stations
callsign = packet.get('callsign')
if callsign and callsign not in aprs_stations:
aprs_station_count += 1
# Skip non-packet lines (APRS format: CALL>PATH:DATA)
if '>' not in line or ':' not in line:
continue
# Update station data, preserving last known coordinates when
# packets do not contain position fields.
if callsign:
existing = aprs_stations.get(callsign, {})
packet_lat = packet.get('lat')
packet_lon = packet.get('lon')
aprs_stations[callsign] = {
'callsign': callsign,
'lat': packet_lat if packet_lat is not None else existing.get('lat'),
'lon': packet_lon if packet_lon is not None else existing.get('lon'),
'symbol': packet.get('symbol') or existing.get('symbol'),
'last_seen': packet.get('timestamp'),
'packet_type': packet.get('packet_type'),
}
# Geofence check
_aprs_lat = packet_lat
_aprs_lon = packet_lon
if _aprs_lat is not None and _aprs_lon is not None:
try:
from utils.geofence import get_geofence_manager
for _gf_evt in get_geofence_manager().check_position(
callsign, 'aprs_station', _aprs_lat, _aprs_lon,
{'callsign': callsign}
):
process_event('aprs', _gf_evt, 'geofence')
except Exception:
pass
# Evict oldest stations when limit is exceeded
if len(aprs_stations) > APRS_MAX_STATIONS:
oldest = min(
aprs_stations,
key=lambda k: aprs_stations[k].get('last_seen', ''),
)
del aprs_stations[oldest]
packet = parse_aprs_packet(line)
if packet:
aprs_packet_count += 1
aprs_last_packet_time = time.time()
app_module.aprs_queue.put(packet)
# Track unique stations
callsign = packet.get('callsign')
if callsign and callsign not in aprs_stations:
aprs_station_count += 1
# Log if enabled
if app_module.logging_enabled:
try:
with open(app_module.log_file_path, 'a') as f:
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{ts} | APRS | {json.dumps(packet)}\n")
except Exception:
pass
# Update station data, preserving last known coordinates when
# packets do not contain position fields.
if callsign:
existing = aprs_stations.get(callsign, {})
packet_lat = packet.get('lat')
packet_lon = packet.get('lon')
aprs_stations[callsign] = {
'callsign': callsign,
'lat': packet_lat if packet_lat is not None else existing.get('lat'),
'lon': packet_lon if packet_lon is not None else existing.get('lon'),
'symbol': packet.get('symbol') or existing.get('symbol'),
'last_seen': packet.get('timestamp'),
'packet_type': packet.get('packet_type'),
}
# Geofence check
_aprs_lat = packet_lat
_aprs_lon = packet_lon
if _aprs_lat is not None and _aprs_lon is not None:
try:
from utils.geofence import get_geofence_manager
for _gf_evt in get_geofence_manager().check_position(
callsign, 'aprs_station', _aprs_lat, _aprs_lon,
{'callsign': callsign}
):
process_event('aprs', _gf_evt, 'geofence')
except Exception:
pass
# Evict oldest stations when limit is exceeded
if len(aprs_stations) > APRS_MAX_STATIONS:
oldest = min(
aprs_stations,
key=lambda k: aprs_stations[k].get('last_seen', ''),
)
del aprs_stations[oldest]
app_module.aprs_queue.put(packet)
# Log if enabled
if app_module.logging_enabled:
try:
with open(app_module.log_file_path, 'a') as f:
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{ts} | APRS | {json.dumps(packet)}\n")
except Exception:
pass
except Exception as e:
logger.error(f"APRS stream error: {e}")
app_module.aprs_queue.put({'type': 'error', 'message': str(e)})
finally:
try:
os.close(master_fd)
except OSError:
pass
app_module.aprs_queue.put({'type': 'status', 'status': 'stopped'})
# Cleanup processes
for proc in [rtl_process, decoder_process]:
@@ -1785,19 +1811,25 @@ def start_aprs() -> Response:
rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True)
rtl_stderr_thread.start()
# Create a pseudo-terminal for decoder output. PTY forces the
# decoder to line-buffer its stdout, avoiding the 15-minute delay
# caused by full pipe buffering (~4-8KB) on small APRS packets.
master_fd, slave_fd = pty.openpty()
# Start decoder with stdin wired to rtl_fm's stdout.
# Use binary mode to avoid UnicodeDecodeError on raw/corrupted bytes
# from the radio decoder (e.g. 0xf7). Lines are decoded manually
# in stream_aprs_output with errors='replace'.
# Merge stderr into stdout to avoid blocking on unbuffered stderr.
# stdout/stderr go to the PTY slave so output is line-buffered.
decoder_process = subprocess.Popen(
decoder_cmd,
stdin=rtl_process.stdout,
stdout=PIPE,
stderr=STDOUT,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True,
start_new_session=True
)
# Close slave fd in parent — decoder owns it now.
os.close(slave_fd)
# Close rtl_fm's stdout in parent so decoder owns it exclusively.
# This ensures proper EOF propagation when rtl_fm terminates.
rtl_process.stdout.close()
@@ -1818,6 +1850,10 @@ def start_aprs() -> Response:
if stderr_output:
error_msg += f': {stderr_output[:200]}'
logger.error(error_msg)
try:
os.close(master_fd)
except OSError:
pass
try:
decoder_process.kill()
except Exception:
@@ -1828,13 +1864,23 @@ def start_aprs() -> Response:
return jsonify({'status': 'error', 'message': error_msg}), 500
if decoder_process.poll() is not None:
# Decoder exited early - capture any output
raw_output = decoder_process.stdout.read()[:500] if decoder_process.stdout else b''
error_output = raw_output.decode('utf-8', errors='replace') if raw_output else ''
# Decoder exited early - capture any output from PTY
error_output = ''
try:
ready, _, _ = select.select([master_fd], [], [], 0.5)
if ready:
raw = os.read(master_fd, 500)
error_output = raw.decode('utf-8', errors='replace')
except Exception:
pass
error_msg = f'{decoder_name} failed to start'
if error_output:
error_msg += f': {error_output}'
logger.error(error_msg)
try:
os.close(master_fd)
except OSError:
pass
try:
rtl_process.kill()
except Exception:
@@ -1847,11 +1893,12 @@ def start_aprs() -> Response:
# Store references for status checks and cleanup
app_module.aprs_process = decoder_process
app_module.aprs_rtl_process = rtl_process
app_module.aprs_master_fd = master_fd
# Start background thread to read decoder output and push to queue
thread = threading.Thread(
target=stream_aprs_output,
args=(rtl_process, decoder_process),
args=(master_fd, rtl_process, decoder_process),
daemon=True
)
thread.start()
@@ -1902,6 +1949,14 @@ def stop_aprs() -> Response:
except Exception as e:
logger.error(f"Error stopping APRS process: {e}")
# Close PTY master fd
if hasattr(app_module, 'aprs_master_fd') and app_module.aprs_master_fd is not None:
try:
os.close(app_module.aprs_master_fd)
except OSError:
pass
app_module.aprs_master_fd = None
app_module.aprs_process = None
if hasattr(app_module, 'aprs_rtl_process'):
app_module.aprs_rtl_process = None