mirror of
https://github.com/smittix/intercept.git
synced 2026-06-12 16:03:29 -07:00
Harden Morse startup PCM detection and retry fallback
This commit is contained in:
+173
-80
@@ -278,39 +278,52 @@ def start_morse() -> Response:
|
||||
sdr_device = SDRFactory.create_default_device(sdr_type, index=device)
|
||||
builder = SDRFactory.get_builder(sdr_device.sdr_type)
|
||||
|
||||
fm_kwargs: dict[str, Any] = {
|
||||
'device': sdr_device,
|
||||
'frequency_mhz': freq,
|
||||
'sample_rate': sample_rate,
|
||||
'gain': float(gain) if gain and gain != '0' else None,
|
||||
'ppm': int(ppm) if ppm and ppm != '0' else None,
|
||||
'modulation': 'usb',
|
||||
'bias_t': bias_t,
|
||||
}
|
||||
def _build_rtl_cmd(*, use_direct_sampling: bool, force_squelch_off: bool) -> list[str]:
|
||||
fm_kwargs: dict[str, Any] = {
|
||||
'device': sdr_device,
|
||||
'frequency_mhz': freq,
|
||||
'sample_rate': sample_rate,
|
||||
'gain': float(gain) if gain and gain != '0' else None,
|
||||
'ppm': int(ppm) if ppm and ppm != '0' else None,
|
||||
'modulation': 'usb',
|
||||
'bias_t': bias_t,
|
||||
}
|
||||
|
||||
# Only rtl_fm supports direct sampling flags.
|
||||
if sdr_device.sdr_type == SDRType.RTL_SDR and freq < 24.0:
|
||||
fm_kwargs['direct_sampling'] = 2
|
||||
# Only rtl_fm supports direct sampling flags.
|
||||
if use_direct_sampling:
|
||||
fm_kwargs['direct_sampling'] = 2
|
||||
|
||||
rtl_cmd = builder.build_fm_demod_command(
|
||||
**fm_kwargs,
|
||||
)
|
||||
cmd = builder.build_fm_demod_command(**fm_kwargs)
|
||||
|
||||
# Some rtl_fm builds behave as if squelch is enabled unless -l is explicit.
|
||||
# Force continuous audio for CW analysis.
|
||||
if sdr_device.sdr_type == SDRType.RTL_SDR and '-l' not in rtl_cmd:
|
||||
if rtl_cmd and rtl_cmd[-1] == '-':
|
||||
rtl_cmd[-1:-1] = ['-l', '0']
|
||||
else:
|
||||
rtl_cmd.extend(['-l', '0'])
|
||||
# Some rtl_fm builds behave as if squelch is enabled unless -l is explicit.
|
||||
# Force continuous audio for CW analysis.
|
||||
if force_squelch_off and sdr_device.sdr_type == SDRType.RTL_SDR and '-l' not in cmd:
|
||||
if cmd and cmd[-1] == '-':
|
||||
cmd[-1:-1] = ['-l', '0']
|
||||
else:
|
||||
cmd.extend(['-l', '0'])
|
||||
return cmd
|
||||
|
||||
full_cmd = ' '.join(rtl_cmd)
|
||||
logger.info(f'Morse decoder running: {full_cmd}')
|
||||
can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and freq < 24.0)
|
||||
if can_try_direct_sampling:
|
||||
# Prefer direct2 with explicit squelch-off, then retry with safer variants
|
||||
# if the command starts but never emits PCM.
|
||||
command_attempts = [
|
||||
{'use_direct_sampling': True, 'force_squelch_off': True},
|
||||
{'use_direct_sampling': True, 'force_squelch_off': False},
|
||||
{'use_direct_sampling': False, 'force_squelch_off': True},
|
||||
]
|
||||
else:
|
||||
command_attempts = [
|
||||
{'use_direct_sampling': False, 'force_squelch_off': True},
|
||||
{'use_direct_sampling': False, 'force_squelch_off': False},
|
||||
]
|
||||
|
||||
rtl_process: subprocess.Popen | None = None
|
||||
stop_event: threading.Event | None = None
|
||||
decoder_thread: threading.Thread | None = None
|
||||
stderr_thread: threading.Thread | None = None
|
||||
control_queue: queue.Queue | None = None
|
||||
|
||||
runtime_config: dict[str, Any] = {
|
||||
'sample_rate': sample_rate,
|
||||
@@ -329,70 +342,150 @@ def start_morse() -> Response:
|
||||
}
|
||||
|
||||
try:
|
||||
rtl_process = subprocess.Popen(
|
||||
rtl_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=0,
|
||||
)
|
||||
register_process(rtl_process)
|
||||
def _cleanup_attempt(
|
||||
proc: subprocess.Popen | None,
|
||||
attempt_stop_event: threading.Event | None,
|
||||
attempt_control_queue: queue.Queue | None,
|
||||
attempt_decoder_thread: threading.Thread | None,
|
||||
attempt_stderr_thread: threading.Thread | None,
|
||||
) -> None:
|
||||
if attempt_stop_event is not None:
|
||||
attempt_stop_event.set()
|
||||
if attempt_control_queue is not None:
|
||||
with contextlib.suppress(queue.Full):
|
||||
attempt_control_queue.put_nowait({'cmd': 'shutdown'})
|
||||
if proc is not None:
|
||||
_close_pipe(getattr(proc, 'stdout', None))
|
||||
_close_pipe(getattr(proc, 'stderr', None))
|
||||
safe_terminate(proc, timeout=0.5)
|
||||
unregister_process(proc)
|
||||
_join_thread(attempt_decoder_thread, timeout_s=0.35)
|
||||
_join_thread(attempt_stderr_thread, timeout_s=0.35)
|
||||
|
||||
stop_event = threading.Event()
|
||||
control_queue: queue.Queue = queue.Queue(maxsize=16)
|
||||
attempt_errors: list[str] = []
|
||||
full_cmd = ''
|
||||
|
||||
def monitor_stderr() -> None:
|
||||
if not rtl_process or rtl_process.stderr is None:
|
||||
return
|
||||
for line in rtl_process.stderr:
|
||||
if stop_event.is_set():
|
||||
for attempt_index, attempt in enumerate(command_attempts, start=1):
|
||||
use_direct_sampling = bool(attempt.get('use_direct_sampling', False))
|
||||
force_squelch_off = bool(attempt.get('force_squelch_off', True))
|
||||
|
||||
rtl_cmd = _build_rtl_cmd(
|
||||
use_direct_sampling=use_direct_sampling,
|
||||
force_squelch_off=force_squelch_off,
|
||||
)
|
||||
full_cmd = ' '.join(rtl_cmd)
|
||||
logger.info(f'Morse decoder attempt {attempt_index}/{len(command_attempts)}: {full_cmd}')
|
||||
|
||||
with contextlib.suppress(queue.Full):
|
||||
app_module.morse_queue.put_nowait({
|
||||
'type': 'info',
|
||||
'text': f'[cmd] {full_cmd}',
|
||||
})
|
||||
|
||||
rtl_process = subprocess.Popen(
|
||||
rtl_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=0,
|
||||
)
|
||||
register_process(rtl_process)
|
||||
|
||||
stop_event = threading.Event()
|
||||
control_queue = queue.Queue(maxsize=16)
|
||||
pcm_ready_event = threading.Event()
|
||||
|
||||
def monitor_stderr(
|
||||
proc: subprocess.Popen = rtl_process,
|
||||
proc_stop_event: threading.Event = stop_event,
|
||||
) -> None:
|
||||
if proc.stderr is None:
|
||||
return
|
||||
for line in proc.stderr:
|
||||
if proc_stop_event.is_set():
|
||||
break
|
||||
err_text = line.decode('utf-8', errors='replace').strip()
|
||||
if err_text:
|
||||
logger.debug(f'[rtl_fm/morse] {err_text}')
|
||||
with contextlib.suppress(queue.Full):
|
||||
app_module.morse_queue.put_nowait({
|
||||
'type': 'info',
|
||||
'text': f'[rtl_fm] {err_text}',
|
||||
})
|
||||
|
||||
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
|
||||
stderr_thread.start()
|
||||
|
||||
decoder_thread = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(
|
||||
rtl_process.stdout,
|
||||
app_module.morse_queue,
|
||||
stop_event,
|
||||
sample_rate,
|
||||
tone_freq,
|
||||
wpm,
|
||||
),
|
||||
kwargs={
|
||||
'decoder_config': runtime_config,
|
||||
'control_queue': control_queue,
|
||||
'pcm_ready_event': pcm_ready_event,
|
||||
},
|
||||
daemon=True,
|
||||
name='morse-decoder',
|
||||
)
|
||||
decoder_thread.start()
|
||||
|
||||
startup_deadline = time.monotonic() + 2.5
|
||||
startup_ok = False
|
||||
startup_error = ''
|
||||
|
||||
while time.monotonic() < startup_deadline:
|
||||
if pcm_ready_event.is_set():
|
||||
startup_ok = True
|
||||
break
|
||||
err_text = line.decode('utf-8', errors='replace').strip()
|
||||
if err_text:
|
||||
logger.debug(f'[rtl_fm/morse] {err_text}')
|
||||
with contextlib.suppress(queue.Full):
|
||||
app_module.morse_queue.put_nowait({
|
||||
'type': 'info',
|
||||
'text': f'[rtl_fm] {err_text}',
|
||||
})
|
||||
if rtl_process.poll() is not None:
|
||||
startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})'
|
||||
break
|
||||
time.sleep(0.05)
|
||||
|
||||
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
|
||||
stderr_thread.start()
|
||||
if startup_ok:
|
||||
runtime_config['direct_sampling'] = 2 if use_direct_sampling else 0
|
||||
runtime_config['force_squelch_off'] = force_squelch_off
|
||||
break
|
||||
|
||||
decoder_thread = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(
|
||||
rtl_process.stdout,
|
||||
app_module.morse_queue,
|
||||
if not startup_error:
|
||||
startup_error = 'No PCM samples received within startup timeout'
|
||||
|
||||
attempt_errors.append(
|
||||
f'attempt {attempt_index}/{len(command_attempts)} '
|
||||
f'(direct={int(use_direct_sampling)} squelch_forced={int(force_squelch_off)}): {startup_error}'
|
||||
)
|
||||
logger.warning(f'Morse startup attempt failed: {attempt_errors[-1]}')
|
||||
|
||||
with contextlib.suppress(queue.Full):
|
||||
app_module.morse_queue.put_nowait({
|
||||
'type': 'info',
|
||||
'text': f'[morse] startup attempt failed: {startup_error}',
|
||||
})
|
||||
|
||||
_cleanup_attempt(
|
||||
rtl_process,
|
||||
stop_event,
|
||||
sample_rate,
|
||||
tone_freq,
|
||||
wpm,
|
||||
),
|
||||
kwargs={
|
||||
'decoder_config': runtime_config,
|
||||
'control_queue': control_queue,
|
||||
},
|
||||
daemon=True,
|
||||
name='morse-decoder',
|
||||
)
|
||||
decoder_thread.start()
|
||||
control_queue,
|
||||
decoder_thread,
|
||||
stderr_thread,
|
||||
)
|
||||
rtl_process = None
|
||||
stop_event = None
|
||||
control_queue = None
|
||||
decoder_thread = None
|
||||
stderr_thread = None
|
||||
|
||||
# Detect immediate startup failure (e.g. device busy, no device)
|
||||
time.sleep(0.30)
|
||||
if rtl_process.poll() is not None:
|
||||
stop_event.set()
|
||||
stderr_text = ''
|
||||
try:
|
||||
if rtl_process.stderr:
|
||||
stderr_text = rtl_process.stderr.read().decode('utf-8', errors='replace').strip()
|
||||
except Exception:
|
||||
stderr_text = ''
|
||||
msg = stderr_text or f'rtl_fm exited immediately (code {rtl_process.returncode})'
|
||||
if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None or stderr_thread is None:
|
||||
msg = 'rtl_fm started but no PCM stream was received.'
|
||||
if attempt_errors:
|
||||
msg = msg + ' ' + ' | '.join(attempt_errors[-2:])
|
||||
logger.error(f'Morse rtl_fm startup failed: {msg}')
|
||||
safe_terminate(rtl_process, timeout=0.4)
|
||||
unregister_process(rtl_process)
|
||||
_join_thread(decoder_thread, timeout_s=0.25)
|
||||
_join_thread(stderr_thread, timeout_s=0.25)
|
||||
with app_module.morse_lock:
|
||||
if morse_active_device is not None:
|
||||
app_module.release_sdr_device(morse_active_device)
|
||||
|
||||
@@ -317,6 +317,77 @@ class TestMorseLifecycleRoutes:
|
||||
assert final_status['state'] == 'idle'
|
||||
assert 0 in released_devices
|
||||
|
||||
def test_start_retries_after_early_process_exit(self, client, monkeypatch):
|
||||
_login_session(client)
|
||||
self._reset_route_state()
|
||||
|
||||
released_devices = []
|
||||
|
||||
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None)
|
||||
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx))
|
||||
|
||||
class DummyDevice:
|
||||
sdr_type = morse_routes.SDRType.RTL_SDR
|
||||
|
||||
class DummyBuilder:
|
||||
def build_fm_demod_command(self, **kwargs):
|
||||
cmd = ['rtl_fm', '-f', '14.060M', '-M', 'usb', '-s', '22050']
|
||||
if kwargs.get('direct_sampling') == 2:
|
||||
cmd.extend(['-E', 'direct2'])
|
||||
cmd.append('-')
|
||||
return cmd
|
||||
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice()))
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder()))
|
||||
|
||||
pcm = generate_morse_audio('E', wpm=15, sample_rate=22050)
|
||||
popen_cmds = []
|
||||
|
||||
class FakeProc:
|
||||
def __init__(self, stdout_bytes: bytes, returncode: int | None):
|
||||
self.stdout = io.BytesIO(stdout_bytes)
|
||||
self.stderr = io.BytesIO(b'')
|
||||
self.returncode = returncode
|
||||
|
||||
def poll(self):
|
||||
return self.returncode
|
||||
|
||||
def fake_popen(cmd, *args, **kwargs):
|
||||
popen_cmds.append(cmd)
|
||||
if len(popen_cmds) == 1:
|
||||
return FakeProc(b'', 1)
|
||||
return FakeProc(pcm, None)
|
||||
|
||||
monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen)
|
||||
monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None)
|
||||
monkeypatch.setattr(morse_routes, 'unregister_process', lambda _proc: None)
|
||||
monkeypatch.setattr(
|
||||
morse_routes,
|
||||
'safe_terminate',
|
||||
lambda proc, timeout=0.0: setattr(proc, 'returncode', 0),
|
||||
)
|
||||
|
||||
start_resp = client.post('/morse/start', json={
|
||||
'frequency': '14.060',
|
||||
'gain': '20',
|
||||
'ppm': '0',
|
||||
'device': '0',
|
||||
'tone_freq': '700',
|
||||
'wpm': '15',
|
||||
})
|
||||
assert start_resp.status_code == 200
|
||||
assert start_resp.get_json()['status'] == 'started'
|
||||
assert len(popen_cmds) >= 2
|
||||
assert '-E' in popen_cmds[0] and 'direct2' in popen_cmds[0]
|
||||
assert '-l' in popen_cmds[0]
|
||||
assert '-E' in popen_cmds[1] and 'direct2' in popen_cmds[1]
|
||||
assert '-l' not in popen_cmds[1]
|
||||
|
||||
stop_resp = client.post('/morse/stop')
|
||||
assert stop_resp.status_code == 200
|
||||
assert stop_resp.get_json()['status'] == 'stopped'
|
||||
assert 0 in released_devices
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: synthetic CW -> WAV decode
|
||||
|
||||
@@ -734,6 +734,7 @@ def morse_decoder_thread(
|
||||
wpm: int = 15,
|
||||
decoder_config: dict[str, Any] | None = None,
|
||||
control_queue: queue.Queue | None = None,
|
||||
pcm_ready_event: threading.Event | None = None,
|
||||
) -> None:
|
||||
"""Decode Morse from live PCM stream and push events to *output_queue*."""
|
||||
import logging
|
||||
@@ -856,6 +857,8 @@ def morse_decoder_thread(
|
||||
|
||||
if not first_pcm_logged:
|
||||
first_pcm_logged = True
|
||||
if pcm_ready_event is not None:
|
||||
pcm_ready_event.set()
|
||||
with contextlib.suppress(queue.Full):
|
||||
output_queue.put_nowait({
|
||||
'type': 'info',
|
||||
|
||||
Reference in New Issue
Block a user