Add merged-stream rtl_fm fallback for Morse startup

This commit is contained in:
Smittix
2026-02-26 12:58:29 +00:00
parent a5eefc712a
commit 81e5f5479f
2 changed files with 93 additions and 21 deletions
+47 -21
View File
@@ -363,6 +363,16 @@ def start_morse() -> Response:
'force_squelch_off': True,
'add_resample_rate': True,
'add_dc_fast': True,
'merge_stderr': False,
},
{
# Last-resort compatibility mode: some rtl_fm variants may route
# payload unexpectedly; merge stderr->stdout and strip text logs.
'use_direct_sampling': False,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
'merge_stderr': True,
},
]
else:
@@ -372,18 +382,28 @@ def start_morse() -> Response:
'force_squelch_off': False,
'add_resample_rate': False,
'add_dc_fast': False,
'merge_stderr': False,
},
{
'use_direct_sampling': False,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
'merge_stderr': False,
},
{
'use_direct_sampling': False,
'force_squelch_off': True,
'add_resample_rate': True,
'add_dc_fast': True,
'merge_stderr': False,
},
{
'use_direct_sampling': False,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
'merge_stderr': True,
},
]
@@ -438,6 +458,7 @@ def start_morse() -> Response:
force_squelch_off = bool(attempt.get('force_squelch_off', True))
add_resample_rate = bool(attempt.get('add_resample_rate', False))
add_dc_fast = bool(attempt.get('add_dc_fast', False))
merge_stderr = bool(attempt.get('merge_stderr', False))
rtl_cmd = _build_rtl_cmd(
use_direct_sampling=use_direct_sampling,
@@ -457,7 +478,7 @@ def start_morse() -> Response:
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stderr=subprocess.STDOUT if merge_stderr else subprocess.PIPE,
bufsize=0,
)
register_process(rtl_process)
@@ -466,26 +487,29 @@ def start_morse() -> Response:
control_queue = queue.Queue(maxsize=16)
pcm_ready_event = threading.Event()
def monitor_stderr(
proc: subprocess.Popen = rtl_process,
proc_stop_event: threading.Event = stop_event,
) -> None:
if proc.stderr is None:
return
for line in proc.stderr:
if proc_stop_event.is_set():
break
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
logger.debug(f'[rtl_fm/morse] {err_text}')
with contextlib.suppress(queue.Full):
app_module.morse_queue.put_nowait({
'type': 'info',
'text': f'[rtl_fm] {err_text}',
})
if not merge_stderr:
def monitor_stderr(
proc: subprocess.Popen = rtl_process,
proc_stop_event: threading.Event = stop_event,
) -> None:
if proc.stderr is None:
return
for line in proc.stderr:
if proc_stop_event.is_set():
break
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
logger.debug(f'[rtl_fm/morse] {err_text}')
with contextlib.suppress(queue.Full):
app_module.morse_queue.put_nowait({
'type': 'info',
'text': f'[rtl_fm] {err_text}',
})
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
stderr_thread.start()
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
stderr_thread.start()
else:
stderr_thread = None
decoder_thread = threading.Thread(
target=morse_decoder_thread,
@@ -501,6 +525,7 @@ def start_morse() -> Response:
'decoder_config': runtime_config,
'control_queue': control_queue,
'pcm_ready_event': pcm_ready_event,
'strip_text_chunks': merge_stderr,
},
daemon=True,
name='morse-decoder',
@@ -525,6 +550,7 @@ def start_morse() -> Response:
runtime_config['force_squelch_off'] = force_squelch_off
runtime_config['resample_rate'] = sample_rate if add_resample_rate else None
runtime_config['dc_fast'] = add_dc_fast
runtime_config['merge_stderr'] = merge_stderr
break
if not startup_error:
@@ -533,7 +559,7 @@ def start_morse() -> Response:
attempt_errors.append(
f'attempt {attempt_index}/{len(command_attempts)} '
f'(direct={int(use_direct_sampling)} squelch_forced={int(force_squelch_off)} '
f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)}): {startup_error}'
f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)} merged={int(merge_stderr)}): {startup_error}'
)
logger.warning(f'Morse startup attempt failed: {attempt_errors[-1]}')
+46
View File
@@ -725,6 +725,34 @@ def _emit_waiting_scope(output_queue: queue.Queue, waiting_since: float) -> None
})
def _is_probably_rtl_log_text(data: bytes) -> bool:
"""Heuristic: identify rtl_fm stderr log chunks when streams are merged."""
if not data:
return False
# PCM usually contains NULLs/non-printables; plain log lines do not.
if b'\x00' in data:
return False
printable = sum(1 for b in data if (32 <= b <= 126) or b in (9, 10, 13))
ratio = printable / max(1, len(data))
if ratio < 0.92:
return False
lower = data.lower()
keywords = (
b'rtl_fm',
b'found ',
b'using device',
b'tuned to',
b'sampling at',
b'output at',
b'buffer size',
b'gain',
b'direct sampling',
b'oversampling',
b'exact sample rate',
)
return any(token in lower for token in keywords)
def morse_decoder_thread(
rtl_stdout,
output_queue: queue.Queue,
@@ -735,6 +763,7 @@ def morse_decoder_thread(
decoder_config: dict[str, Any] | None = None,
control_queue: queue.Queue | None = None,
pcm_ready_event: threading.Event | None = None,
strip_text_chunks: bool = False,
) -> None:
"""Decode Morse from live PCM stream and push events to *output_queue*."""
import logging
@@ -798,6 +827,23 @@ def morse_decoder_thread(
if not data:
break
if strip_text_chunks and _is_probably_rtl_log_text(data):
try:
text = data.decode('utf-8', errors='replace')
except Exception:
text = ''
if text:
for line in text.splitlines():
clean = line.strip()
if not clean:
continue
with contextlib.suppress(queue.Full):
output_queue.put_nowait({
'type': 'info',
'text': f'[rtl_fm] {clean}',
})
continue
try:
raw_queue.put(data, timeout=0.2)
except queue.Full: