Fix DMR audio/text deadlock: start ffmpeg per-client, not at launch

Starting ffmpeg at decoder launch caused a pipe-buffer deadlock: ffmpeg
stdout filled up (~64KB on Linux) before the browser connected to the
audio stream, back-pressuring the entire pipeline and freezing dsd-fme
stderr output (no text data, no syncs, no calls).

New architecture: a mux thread always drains dsd-fme stdout to keep the
pipeline flowing. ffmpeg starts lazily per-client when /dmr/audio/stream
is requested (matching the listening post pattern). The mux forwards
decoded audio to the active ffmpeg with silence fill during voice gaps,
and discards audio when no client is connected.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-09 19:25:07 +00:00
parent fce66a6a60
commit 00c9a6fdd9

View File

@@ -37,14 +37,19 @@ dmr_bp = Blueprint('dmr', __name__, url_prefix='/dmr')
dmr_rtl_process: Optional[subprocess.Popen] = None
dmr_dsd_process: Optional[subprocess.Popen] = None
dmr_audio_process: Optional[subprocess.Popen] = None
dmr_thread: Optional[threading.Thread] = None
dmr_running = False
dmr_audio_running = False
dmr_has_audio = False # True when ffmpeg available and dsd outputs audio
dmr_lock = threading.Lock()
dmr_queue: queue.Queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
dmr_active_device: Optional[int] = None
# Audio mux: the sole reader of dsd-fme stdout. Writes to an ffmpeg
# stdin when a streaming client is connected, discards otherwise.
# This prevents dsd-fme from blocking on stdout (which would also
# freeze stderr / text data output).
_active_ffmpeg_stdin: Optional[object] = None # set by stream endpoint
VALID_PROTOCOLS = ['auto', 'dmr', 'p25', 'nxdn', 'dstar', 'provoice']
# Classic dsd flags
@@ -209,40 +214,40 @@ _HEARTBEAT_INTERVAL = 3.0 # seconds between heartbeats when decoder is idle
_SILENCE_CHUNK = b'\x00' * 1600
def _audio_bridge(dsd_stdout, ffmpeg_stdin):
"""Bridge DSD audio output to ffmpeg, inserting silence during gaps.
def _dsd_audio_mux(dsd_stdout):
"""Mux thread: sole reader of dsd-fme stdout.
Digital voice is intermittent the decoder only outputs PCM during
active voice transmissions. Without this bridge, ffmpeg would block
waiting for its first input bytes and never write the WAV header,
causing the browser ``<audio>`` element to fail with
"no supported source found".
The bridge feeds 100ms silence chunks during gaps so ffmpeg always
has input, the WAV header is written immediately, and the browser
receives a continuous audio stream (silence when idle, decoded voice
when active).
Always drains dsd-fme's audio output to prevent the process from
blocking on stdout writes (which would also freeze stderr / text
data). When an audio streaming client is connected, forwards audio
to its ffmpeg stdin with silence fill during voice gaps. When no
client is connected, simply discards the data.
"""
try:
while dmr_audio_running:
while dmr_running:
ready, _, _ = select.select([dsd_stdout], [], [], 0.1)
if ready:
data = os.read(dsd_stdout.fileno(), 4096)
if not data:
break
ffmpeg_stdin.write(data)
ffmpeg_stdin.flush()
sink = _active_ffmpeg_stdin
if sink:
try:
sink.write(data)
sink.flush()
except (BrokenPipeError, OSError, ValueError):
pass
else:
# No audio from decoder — feed silence to keep stream alive
ffmpeg_stdin.write(_SILENCE_CHUNK)
ffmpeg_stdin.flush()
except (BrokenPipeError, OSError, ValueError):
# No audio from decoder — feed silence if client connected
sink = _active_ffmpeg_stdin
if sink:
try:
sink.write(_SILENCE_CHUNK)
sink.flush()
except (BrokenPipeError, OSError, ValueError):
pass
except (OSError, ValueError):
pass
finally:
try:
ffmpeg_stdin.close()
except Exception:
pass
def _queue_put(event: dict):
@@ -311,9 +316,7 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop
logger.error(f"DSD stream error: {e}")
finally:
global dmr_active_device, dmr_rtl_process, dmr_dsd_process
global dmr_audio_process, dmr_audio_running
dmr_running = False
dmr_audio_running = False
# Capture exit info for diagnostics
rc = dsd_process.poll()
reason = 'stopped'
@@ -327,8 +330,8 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop
except Exception:
pass
logger.warning(f"DSD process exited with code {rc}: {detail}")
# Cleanup all pipeline processes (audio encoder + decoder + demod)
for proc in [dmr_audio_process, dsd_process, rtl_process]:
# Cleanup decoder + demod processes
for proc in [dsd_process, rtl_process]:
if proc and proc.poll() is None:
try:
proc.terminate()
@@ -342,7 +345,6 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop
unregister_process(proc)
dmr_rtl_process = None
dmr_dsd_process = None
dmr_audio_process = None
_queue_put({'type': 'status', 'text': reason, 'exit_code': rc, 'detail': detail})
# Release SDR device
if dmr_active_device is not None:
@@ -373,8 +375,8 @@ def check_tools() -> Response:
@dmr_bp.route('/start', methods=['POST'])
def start_dmr() -> Response:
"""Start digital voice decoding."""
global dmr_rtl_process, dmr_dsd_process, dmr_audio_process, dmr_thread
global dmr_running, dmr_audio_running, dmr_active_device
global dmr_rtl_process, dmr_dsd_process, dmr_thread
global dmr_running, dmr_has_audio, dmr_active_device
with dmr_lock:
if dmr_running:
@@ -482,37 +484,16 @@ def start_dmr() -> Response:
# Allow rtl_fm to send directly to dsd
dmr_rtl_process.stdout.close()
# Start ffmpeg to transcode DSD 8kHz s16le PCM → 44.1kHz WAV.
# A bridge thread reads DSD stdout and writes to ffmpeg stdin,
# inserting silence during voice gaps so ffmpeg always has input
# and the browser receives a continuous WAV stream.
# Start mux thread: always drains dsd-fme stdout to prevent the
# process from blocking (which would freeze stderr / text data).
# ffmpeg is started lazily per-client in /dmr/audio/stream.
if ffmpeg_path and dmr_dsd_process.stdout:
encoder_cmd = [
ffmpeg_path, '-hide_banner', '-loglevel', 'error',
'-fflags', 'nobuffer', '-flags', 'low_delay',
'-probesize', '32', '-analyzeduration', '0',
'-f', 's16le', '-ar', '8000', '-ac', '1', '-i', 'pipe:0',
'-acodec', 'pcm_s16le', '-ar', '44100', '-f', 'wav', 'pipe:1',
]
dmr_audio_process = subprocess.Popen(
encoder_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(dmr_audio_process)
dmr_audio_running = True
# Bridge: reads DSD audio, feeds ffmpeg with silence fill
dmr_has_audio = True
threading.Thread(
target=_audio_bridge,
args=(dmr_dsd_process.stdout, dmr_audio_process.stdin),
target=_dsd_audio_mux,
args=(dmr_dsd_process.stdout,),
daemon=True,
).start()
# Drain ffmpeg stderr to prevent blocking
threading.Thread(
target=lambda p: [None for _ in p.stderr],
args=(dmr_audio_process,), daemon=True,
).start()
time.sleep(0.3)
@@ -528,8 +509,8 @@ def start_dmr() -> Response:
dsd_err = dmr_dsd_process.stderr.read().decode('utf-8', errors='replace')[:500]
logger.error(f"DSD pipeline died: rtl_fm rc={rtl_rc} err={rtl_err!r}, dsd rc={dsd_rc} err={dsd_err!r}")
# Terminate surviving processes and unregister all
dmr_audio_running = False
for proc in [dmr_audio_process, dmr_dsd_process, dmr_rtl_process]:
dmr_has_audio = False
for proc in [dmr_dsd_process, dmr_rtl_process]:
if proc and proc.poll() is None:
try:
proc.terminate()
@@ -543,7 +524,6 @@ def start_dmr() -> Response:
unregister_process(proc)
dmr_rtl_process = None
dmr_dsd_process = None
dmr_audio_process = None
if dmr_active_device is not None:
app_module.release_sdr_device(dmr_active_device)
dmr_active_device = None
@@ -579,7 +559,7 @@ def start_dmr() -> Response:
'status': 'started',
'frequency': frequency,
'protocol': protocol,
'has_audio': dmr_audio_running,
'has_audio': dmr_has_audio,
})
except Exception as e:
@@ -593,14 +573,14 @@ def start_dmr() -> Response:
@dmr_bp.route('/stop', methods=['POST'])
def stop_dmr() -> Response:
"""Stop digital voice decoding."""
global dmr_rtl_process, dmr_dsd_process, dmr_audio_process
global dmr_running, dmr_audio_running, dmr_active_device
global dmr_rtl_process, dmr_dsd_process
global dmr_running, dmr_has_audio, dmr_active_device
with dmr_lock:
dmr_running = False
dmr_audio_running = False
dmr_has_audio = False
for proc in [dmr_audio_process, dmr_dsd_process, dmr_rtl_process]:
for proc in [dmr_dsd_process, dmr_rtl_process]:
if proc and proc.poll() is None:
try:
proc.terminate()
@@ -615,7 +595,6 @@ def stop_dmr() -> Response:
dmr_rtl_process = None
dmr_dsd_process = None
dmr_audio_process = None
if dmr_active_device is not None:
app_module.release_sdr_device(dmr_active_device)
@@ -630,46 +609,86 @@ def dmr_status() -> Response:
return jsonify({
'running': dmr_running,
'device': dmr_active_device,
'has_audio': dmr_audio_running,
'has_audio': dmr_has_audio,
})
@dmr_bp.route('/audio/stream')
def stream_dmr_audio() -> Response:
"""Stream decoded digital voice audio as WAV."""
# Wait briefly for audio pipeline to be ready
for _ in range(100):
if dmr_audio_running and dmr_audio_process:
break
time.sleep(0.05)
"""Stream decoded digital voice audio as WAV.
if not dmr_audio_running or not dmr_audio_process:
Starts a per-client ffmpeg encoder. The global mux thread
(_dsd_audio_mux) forwards DSD audio to this ffmpeg's stdin while
the client is connected, and discards audio otherwise. This avoids
the pipe-buffer deadlock that occurs when ffmpeg is started at
decoder launch (its stdout fills up before any HTTP client reads
it, back-pressuring the entire pipeline and freezing stderr/text
data output).
"""
global _active_ffmpeg_stdin
if not dmr_running or not dmr_has_audio:
return Response(b'', mimetype='audio/wav', status=204)
ffmpeg_path = find_ffmpeg()
if not ffmpeg_path:
return Response(b'', mimetype='audio/wav', status=503)
encoder_cmd = [
ffmpeg_path, '-hide_banner', '-loglevel', 'error',
'-fflags', 'nobuffer', '-flags', 'low_delay',
'-probesize', '32', '-analyzeduration', '0',
'-f', 's16le', '-ar', '8000', '-ac', '1', '-i', 'pipe:0',
'-acodec', 'pcm_s16le', '-ar', '44100', '-f', 'wav', 'pipe:1',
]
audio_proc = subprocess.Popen(
encoder_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Drain ffmpeg stderr to prevent blocking
threading.Thread(
target=lambda p: [None for _ in p.stderr],
args=(audio_proc,), daemon=True,
).start()
# Tell the mux thread to start writing to this ffmpeg
_active_ffmpeg_stdin = audio_proc.stdin
def generate():
proc = dmr_audio_process
if not proc or not proc.stdout:
return
global _active_ffmpeg_stdin
try:
# The audio bridge thread feeds silence during voice gaps,
# so ffmpeg always produces output and the first chunk
# arrives quickly. We still use select() to avoid blocking
# forever if the process dies unexpectedly.
while dmr_audio_running and proc.poll() is None:
ready, _, _ = select.select([proc.stdout], [], [], 2.0)
while dmr_running and audio_proc.poll() is None:
ready, _, _ = select.select([audio_proc.stdout], [], [], 2.0)
if ready:
chunk = proc.stdout.read(4096)
chunk = audio_proc.stdout.read(4096)
if chunk:
yield chunk
else:
break
else:
if proc.poll() is not None:
if audio_proc.poll() is not None:
break
except GeneratorExit:
pass
except Exception as e:
logger.error(f"DMR audio stream error: {e}")
finally:
# Disconnect mux → ffmpeg, then clean up
_active_ffmpeg_stdin = None
try:
audio_proc.stdin.close()
except Exception:
pass
try:
audio_proc.terminate()
audio_proc.wait(timeout=2)
except Exception:
try:
audio_proc.kill()
except Exception:
pass
return Response(
generate(),