mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Use PTY for SatDump output capture instead of pipe
SatDump writes to stderr via fwrite() with its custom logger. When stderr is redirected to a pipe, C runtime fully buffers it. Neither stdbuf nor bufsize settings help since SatDump doesn't use stdio for output. PTY (pseudo-terminal) makes SatDump think it's writing to a real terminal, which disables buffering. Also strips ANSI escape codes from the output and properly handles \r progress lines. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -14,8 +14,11 @@ rtl_fm capture for manual decoding when SatDump is unavailable.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import os
|
||||
import pty
|
||||
import re
|
||||
import select
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
@@ -147,6 +150,7 @@ class WeatherSatDecoder:
|
||||
self._images: list[WeatherSatImage] = []
|
||||
self._reader_thread: threading.Thread | None = None
|
||||
self._watcher_thread: threading.Thread | None = None
|
||||
self._pty_master_fd: int | None = None
|
||||
self._current_satellite: str = ''
|
||||
self._current_frequency: float = 0.0
|
||||
self._current_mode: str = ''
|
||||
@@ -297,7 +301,7 @@ class WeatherSatDecoder:
|
||||
# Auto-detect serial by querying rtl_eeprom, fall back to string index.
|
||||
source_id = self._resolve_device_id(device_index)
|
||||
|
||||
satdump_cmd = [
|
||||
cmd = [
|
||||
'satdump', 'live',
|
||||
sat_info['pipeline'],
|
||||
str(self._capture_output_dir),
|
||||
@@ -308,43 +312,52 @@ class WeatherSatDecoder:
|
||||
'--source_id', source_id,
|
||||
]
|
||||
|
||||
# Wrap with stdbuf to disable output buffering.
|
||||
# SatDump (C++) fully buffers stdout when writing to a pipe,
|
||||
# which prevents our reader from seeing any output until exit.
|
||||
if shutil.which('stdbuf'):
|
||||
cmd = ['stdbuf', '-o0', '-e0'] + satdump_cmd
|
||||
else:
|
||||
cmd = satdump_cmd
|
||||
|
||||
if bias_t:
|
||||
cmd.append('--bias')
|
||||
|
||||
logger.info(f"Starting SatDump: {' '.join(cmd)}")
|
||||
|
||||
# Use a pseudo-terminal so SatDump thinks it's writing to a real
|
||||
# terminal. C/C++ runtimes disable buffering on TTYs, which lets
|
||||
# us see output (including \r progress lines) in real time.
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
self._pty_master_fd = master_fd
|
||||
|
||||
self._process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
bufsize=1, # line-buffered
|
||||
env={**os.environ, 'PYTHONUNBUFFERED': '1'},
|
||||
stdout=slave_fd,
|
||||
stderr=slave_fd,
|
||||
stdin=subprocess.DEVNULL,
|
||||
close_fds=True,
|
||||
)
|
||||
os.close(slave_fd) # parent doesn't need the slave side
|
||||
|
||||
# Check for early exit (SatDump errors out immediately)
|
||||
try:
|
||||
retcode = self._process.wait(timeout=3)
|
||||
# Process already died — read whatever output it produced
|
||||
output = ''
|
||||
if self._process.stdout:
|
||||
output = self._process.stdout.read()
|
||||
output = b''
|
||||
try:
|
||||
while True:
|
||||
r, _, _ = select.select([master_fd], [], [], 0.1)
|
||||
if not r:
|
||||
break
|
||||
chunk = os.read(master_fd, 4096)
|
||||
if not chunk:
|
||||
break
|
||||
output += chunk
|
||||
except OSError:
|
||||
pass
|
||||
os.close(master_fd)
|
||||
self._pty_master_fd = None
|
||||
output_str = output.decode('utf-8', errors='replace')
|
||||
error_msg = f"SatDump exited immediately (code {retcode})"
|
||||
if output:
|
||||
# Extract the most useful error line
|
||||
for line in output.strip().splitlines():
|
||||
if output_str:
|
||||
for line in output_str.strip().splitlines():
|
||||
if 'error' in line.lower() or 'could not' in line.lower() or 'cannot' in line.lower():
|
||||
error_msg = line.strip()
|
||||
break
|
||||
logger.error(f"SatDump output:\n{output}")
|
||||
logger.error(f"SatDump output:\n{output_str}")
|
||||
self._process = None
|
||||
raise RuntimeError(error_msg)
|
||||
except subprocess.TimeoutExpired:
|
||||
@@ -416,39 +429,67 @@ class WeatherSatDecoder:
|
||||
# Fall back to string index
|
||||
return str(device_index)
|
||||
|
||||
@staticmethod
|
||||
def _read_lines(stream):
|
||||
"""Read lines from stream, splitting on both \\n and \\r.
|
||||
def _read_pty_lines(self):
|
||||
"""Read lines from the PTY master fd, splitting on \\n and \\r.
|
||||
|
||||
SatDump uses \\r carriage returns for progress updates that overwrite
|
||||
the same terminal line. Python's readline() only splits on \\n, so
|
||||
those updates never arrive. This reads char-by-char and yields
|
||||
complete lines on either delimiter.
|
||||
SatDump uses \\r carriage returns for progress updates. A PTY gives
|
||||
us unbuffered output. We use select() to detect data availability
|
||||
and os.read() for raw bytes, then split on line boundaries.
|
||||
"""
|
||||
buf = []
|
||||
while True:
|
||||
ch = stream.read(1)
|
||||
if not ch:
|
||||
# EOF
|
||||
if buf:
|
||||
yield ''.join(buf)
|
||||
return
|
||||
if ch in ('\n', '\r'):
|
||||
if buf:
|
||||
yield ''.join(buf)
|
||||
buf = []
|
||||
else:
|
||||
buf.append(ch)
|
||||
master_fd = self._pty_master_fd
|
||||
if master_fd is None:
|
||||
return
|
||||
|
||||
buf = b''
|
||||
while self._running:
|
||||
try:
|
||||
r, _, _ = select.select([master_fd], [], [], 1.0)
|
||||
if not r:
|
||||
# Timeout — check if process is still alive
|
||||
if self._process and self._process.poll() is not None:
|
||||
break
|
||||
continue
|
||||
chunk = os.read(master_fd, 4096)
|
||||
if not chunk:
|
||||
break
|
||||
buf += chunk
|
||||
# Split on \r and \n
|
||||
while b'\n' in buf or b'\r' in buf:
|
||||
# Find earliest delimiter
|
||||
idx_n = buf.find(b'\n')
|
||||
idx_r = buf.find(b'\r')
|
||||
if idx_n == -1:
|
||||
idx = idx_r
|
||||
elif idx_r == -1:
|
||||
idx = idx_n
|
||||
else:
|
||||
idx = min(idx_n, idx_r)
|
||||
line = buf[:idx]
|
||||
buf = buf[idx + 1:]
|
||||
# Skip empty lines
|
||||
text = line.decode('utf-8', errors='replace').strip()
|
||||
# Strip ANSI escape codes that terminals produce
|
||||
text = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', text)
|
||||
if text:
|
||||
yield text
|
||||
except OSError:
|
||||
break
|
||||
# Drain remaining buffer
|
||||
text = buf.decode('utf-8', errors='replace').strip()
|
||||
if text:
|
||||
text = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', text)
|
||||
if text:
|
||||
yield text
|
||||
|
||||
def _read_satdump_output(self) -> None:
|
||||
"""Read SatDump stdout/stderr for progress updates."""
|
||||
if not self._process or not self._process.stdout:
|
||||
if not self._process or self._pty_master_fd is None:
|
||||
return
|
||||
|
||||
last_emit_time = 0.0
|
||||
|
||||
try:
|
||||
for line in self._read_lines(self._process.stdout):
|
||||
for line in self._read_pty_lines():
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
@@ -544,6 +585,14 @@ class WeatherSatDecoder:
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading SatDump output: {e}")
|
||||
finally:
|
||||
# Close PTY master fd
|
||||
if self._pty_master_fd is not None:
|
||||
try:
|
||||
os.close(self._pty_master_fd)
|
||||
except OSError:
|
||||
pass
|
||||
self._pty_master_fd = None
|
||||
|
||||
# Process ended — release resources
|
||||
was_running = self._running
|
||||
self._running = False
|
||||
@@ -670,6 +719,13 @@ class WeatherSatDecoder:
|
||||
with self._lock:
|
||||
self._running = False
|
||||
|
||||
if self._pty_master_fd is not None:
|
||||
try:
|
||||
os.close(self._pty_master_fd)
|
||||
except OSError:
|
||||
pass
|
||||
self._pty_master_fd = None
|
||||
|
||||
if self._process:
|
||||
try:
|
||||
self._process.terminate()
|
||||
|
||||
Reference in New Issue
Block a user