mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
morse: auto-fallback to alternate SDR device on no-PCM startup
This commit is contained in:
@@ -73,6 +73,7 @@ Auto Tone Track behavior:
|
||||
|
||||
Troubleshooting (no decode / noisy decode):
|
||||
- Confirm demod path is **USB/CW-compatible** and frequency is tuned correctly.
|
||||
- If multiple SDRs are connected and the selected one has no PCM output, Morse startup now auto-tries other detected SDR devices and reports the active device/serial in status logs.
|
||||
- Match **tone** and **bandwidth** to the actual sidetone/pitch.
|
||||
- Try **Threshold Auto** first; if needed, switch to manual threshold and recalibrate.
|
||||
- Use **Reset/Calibrate** after major frequency or band condition changes.
|
||||
|
||||
452
routes/morse.py
452
routes/morse.py
@@ -565,8 +565,29 @@ def start_morse() -> Response:
|
||||
except ValueError:
|
||||
sdr_type = SDRType.RTL_SDR
|
||||
|
||||
sdr_device = SDRFactory.create_default_device(sdr_type, index=device)
|
||||
builder = SDRFactory.get_builder(sdr_device.sdr_type)
|
||||
requested_device_index = int(device)
|
||||
active_device_index = requested_device_index
|
||||
builder = SDRFactory.get_builder(sdr_type)
|
||||
|
||||
device_catalog: dict[int, dict[str, str]] = {}
|
||||
candidate_device_indices: list[int] = [requested_device_index]
|
||||
with contextlib.suppress(Exception):
|
||||
detected_devices = SDRFactory.detect_devices()
|
||||
same_type_devices = [d for d in detected_devices if d.sdr_type == sdr_type]
|
||||
for d in same_type_devices:
|
||||
device_catalog[d.index] = {
|
||||
'name': str(d.name or f'SDR {d.index}'),
|
||||
'serial': str(d.serial or 'Unknown'),
|
||||
}
|
||||
for d in sorted(same_type_devices, key=lambda dev: dev.index):
|
||||
if d.index not in candidate_device_indices:
|
||||
candidate_device_indices.append(d.index)
|
||||
|
||||
def _device_label(device_index: int) -> str:
|
||||
meta = device_catalog.get(device_index, {})
|
||||
serial = str(meta.get('serial') or 'Unknown')
|
||||
name = str(meta.get('name') or f'SDR {device_index}')
|
||||
return f'device {device_index} ({name}, SN: {serial})'
|
||||
|
||||
multimon_path = get_tool_path('multimon-ng')
|
||||
if not multimon_path:
|
||||
@@ -582,7 +603,8 @@ def start_morse() -> Response:
|
||||
|
||||
multimon_cmd = [multimon_path, '-t', 'raw', '-a', 'MORSE_CW', '-f', 'alpha', '-']
|
||||
|
||||
def _build_rtl_cmd(direct_sampling_mode: int | None) -> list[str]:
|
||||
def _build_rtl_cmd(device_index: int, direct_sampling_mode: int | None) -> list[str]:
|
||||
sdr_device = SDRFactory.create_default_device(sdr_type, index=device_index)
|
||||
fm_kwargs: dict[str, Any] = {
|
||||
'device': sdr_device,
|
||||
'frequency_mhz': freq,
|
||||
@@ -601,7 +623,7 @@ def start_morse() -> Response:
|
||||
cmd.append('-')
|
||||
return cmd
|
||||
|
||||
can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and float(freq) < 24.0)
|
||||
can_try_direct_sampling = bool(sdr_type == SDRType.RTL_SDR and float(freq) < 24.0)
|
||||
direct_sampling_attempts: list[int | None] = [2, 1, None] if can_try_direct_sampling else [None]
|
||||
|
||||
runtime_config: dict[str, Any] = {
|
||||
@@ -619,6 +641,10 @@ def start_morse() -> Response:
|
||||
'wpm_lock': wpm_lock,
|
||||
'min_signal_gate': min_signal_gate,
|
||||
'source': 'rtl_fm',
|
||||
'requested_device': requested_device_index,
|
||||
'active_device': active_device_index,
|
||||
'device_serial': str(device_catalog.get(active_device_index, {}).get('serial') or 'Unknown'),
|
||||
'candidate_devices': list(candidate_device_indices),
|
||||
}
|
||||
|
||||
active_rtl_process: subprocess.Popen[bytes] | None = None
|
||||
@@ -679,196 +705,255 @@ def start_morse() -> Response:
|
||||
attempt_errors: list[str] = []
|
||||
|
||||
try:
|
||||
for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1):
|
||||
rtl_process = None
|
||||
multimon_process = None
|
||||
stop_event = None
|
||||
control_queue = None
|
||||
decoder_thread = None
|
||||
stderr_thread = None
|
||||
relay_thread = None
|
||||
master_fd = None
|
||||
startup_succeeded = False
|
||||
for device_pos, candidate_device_index in enumerate(candidate_device_indices, start=1):
|
||||
if candidate_device_index != active_device_index:
|
||||
prev_device = active_device_index
|
||||
claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse')
|
||||
if claim_error:
|
||||
msg = f'{_device_label(candidate_device_index)} unavailable: {claim_error}'
|
||||
attempt_errors.append(msg)
|
||||
logger.warning('Morse startup device fallback skipped: %s', msg)
|
||||
_queue_morse_event({'type': 'info', 'text': f'[morse] {msg}'})
|
||||
continue
|
||||
|
||||
if prev_device is not None:
|
||||
app_module.release_sdr_device(prev_device)
|
||||
active_device_index = candidate_device_index
|
||||
with app_module.morse_lock:
|
||||
morse_active_device = active_device_index
|
||||
|
||||
_queue_morse_event({
|
||||
'type': 'info',
|
||||
'text': (
|
||||
f'[morse] switching to {_device_label(active_device_index)} '
|
||||
f'({device_pos}/{len(candidate_device_indices)})'
|
||||
),
|
||||
})
|
||||
|
||||
runtime_config['active_device'] = active_device_index
|
||||
runtime_config['device_serial'] = str(
|
||||
device_catalog.get(active_device_index, {}).get('serial') or 'Unknown'
|
||||
)
|
||||
runtime_config.pop('startup_waiting', None)
|
||||
runtime_config.pop('startup_warning', None)
|
||||
|
||||
rtl_cmd = _build_rtl_cmd(direct_sampling_mode)
|
||||
direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none'
|
||||
full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd)
|
||||
logger.info(
|
||||
'Morse decoder attempt %s/%s (source=rtl_fm direct_mode=%s): %s',
|
||||
attempt_index,
|
||||
len(direct_sampling_attempts),
|
||||
direct_mode_label,
|
||||
full_cmd,
|
||||
)
|
||||
_queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'})
|
||||
for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1):
|
||||
rtl_process = None
|
||||
multimon_process = None
|
||||
stop_event = None
|
||||
control_queue = None
|
||||
decoder_thread = None
|
||||
stderr_thread = None
|
||||
relay_thread = None
|
||||
master_fd = None
|
||||
|
||||
rtl_process = subprocess.Popen(
|
||||
rtl_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=0,
|
||||
)
|
||||
register_process(rtl_process)
|
||||
|
||||
stop_event = threading.Event()
|
||||
control_queue = queue.Queue(maxsize=16)
|
||||
pcm_ready_event = threading.Event()
|
||||
stderr_lines: list[str] = []
|
||||
|
||||
def monitor_stderr(
|
||||
proc: subprocess.Popen[bytes] = rtl_process,
|
||||
proc_stop_event: threading.Event = stop_event,
|
||||
capture_lines: list[str] = stderr_lines,
|
||||
) -> None:
|
||||
stderr_stream = proc.stderr
|
||||
if stderr_stream is None:
|
||||
return
|
||||
try:
|
||||
while not proc_stop_event.is_set():
|
||||
line = stderr_stream.readline()
|
||||
if not line:
|
||||
if proc.poll() is not None:
|
||||
break
|
||||
time.sleep(0.02)
|
||||
continue
|
||||
err_text = line.decode('utf-8', errors='replace').strip()
|
||||
if not err_text:
|
||||
continue
|
||||
if len(capture_lines) >= 40:
|
||||
del capture_lines[:10]
|
||||
capture_lines.append(err_text)
|
||||
_queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'})
|
||||
except (ValueError, OSError):
|
||||
return
|
||||
except Exception:
|
||||
return
|
||||
|
||||
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
|
||||
stderr_thread.start()
|
||||
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
try:
|
||||
multimon_process = subprocess.Popen(
|
||||
multimon_cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=slave_fd,
|
||||
stderr=slave_fd,
|
||||
close_fds=True,
|
||||
rtl_cmd = _build_rtl_cmd(active_device_index, direct_sampling_mode)
|
||||
direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none'
|
||||
full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd)
|
||||
logger.info(
|
||||
'Morse decoder attempt device=%s (%s/%s) direct_mode=%s (%s/%s): %s',
|
||||
active_device_index,
|
||||
device_pos,
|
||||
len(candidate_device_indices),
|
||||
direct_mode_label,
|
||||
attempt_index,
|
||||
len(direct_sampling_attempts),
|
||||
full_cmd,
|
||||
)
|
||||
finally:
|
||||
with contextlib.suppress(OSError):
|
||||
os.close(slave_fd)
|
||||
register_process(multimon_process)
|
||||
_queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'})
|
||||
|
||||
if rtl_process.stdout is None:
|
||||
raise RuntimeError('rtl_fm stdout unavailable')
|
||||
if multimon_process.stdin is None:
|
||||
raise RuntimeError('multimon-ng stdin unavailable')
|
||||
rtl_process = subprocess.Popen(
|
||||
rtl_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=0,
|
||||
)
|
||||
register_process(rtl_process)
|
||||
|
||||
relay_thread = threading.Thread(
|
||||
target=_morse_audio_relay_thread,
|
||||
args=(
|
||||
rtl_process.stdout,
|
||||
multimon_process.stdin,
|
||||
app_module.morse_queue,
|
||||
stop_event = threading.Event()
|
||||
control_queue = queue.Queue(maxsize=16)
|
||||
pcm_ready_event = threading.Event()
|
||||
stderr_lines: list[str] = []
|
||||
|
||||
def monitor_stderr(
|
||||
proc: subprocess.Popen[bytes] = rtl_process,
|
||||
proc_stop_event: threading.Event = stop_event,
|
||||
capture_lines: list[str] = stderr_lines,
|
||||
) -> None:
|
||||
stderr_stream = proc.stderr
|
||||
if stderr_stream is None:
|
||||
return
|
||||
try:
|
||||
while not proc_stop_event.is_set():
|
||||
line = stderr_stream.readline()
|
||||
if not line:
|
||||
if proc.poll() is not None:
|
||||
break
|
||||
time.sleep(0.02)
|
||||
continue
|
||||
err_text = line.decode('utf-8', errors='replace').strip()
|
||||
if not err_text:
|
||||
continue
|
||||
if len(capture_lines) >= 40:
|
||||
del capture_lines[:10]
|
||||
capture_lines.append(err_text)
|
||||
_queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'})
|
||||
except (ValueError, OSError):
|
||||
return
|
||||
except Exception:
|
||||
return
|
||||
|
||||
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
|
||||
stderr_thread.start()
|
||||
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
try:
|
||||
multimon_process = subprocess.Popen(
|
||||
multimon_cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=slave_fd,
|
||||
stderr=slave_fd,
|
||||
close_fds=True,
|
||||
)
|
||||
finally:
|
||||
with contextlib.suppress(OSError):
|
||||
os.close(slave_fd)
|
||||
register_process(multimon_process)
|
||||
|
||||
if rtl_process.stdout is None:
|
||||
raise RuntimeError('rtl_fm stdout unavailable')
|
||||
if multimon_process.stdin is None:
|
||||
raise RuntimeError('multimon-ng stdin unavailable')
|
||||
|
||||
relay_thread = threading.Thread(
|
||||
target=_morse_audio_relay_thread,
|
||||
args=(
|
||||
rtl_process.stdout,
|
||||
multimon_process.stdin,
|
||||
app_module.morse_queue,
|
||||
stop_event,
|
||||
control_queue,
|
||||
runtime_config,
|
||||
pcm_ready_event,
|
||||
),
|
||||
daemon=True,
|
||||
name='morse-relay',
|
||||
)
|
||||
relay_thread.start()
|
||||
|
||||
decoder_thread = threading.Thread(
|
||||
target=_morse_multimon_output_thread,
|
||||
args=(master_fd, multimon_process, stop_event),
|
||||
daemon=True,
|
||||
name='morse-decoder',
|
||||
)
|
||||
decoder_thread.start()
|
||||
|
||||
startup_deadline = time.monotonic() + 4.0
|
||||
startup_ok = False
|
||||
startup_error = ''
|
||||
|
||||
while time.monotonic() < startup_deadline:
|
||||
if pcm_ready_event.is_set():
|
||||
startup_ok = True
|
||||
break
|
||||
if rtl_process.poll() is not None:
|
||||
startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})'
|
||||
break
|
||||
if multimon_process.poll() is not None:
|
||||
startup_error = f'multimon-ng exited during startup (code {multimon_process.returncode})'
|
||||
break
|
||||
time.sleep(0.05)
|
||||
|
||||
if not startup_ok:
|
||||
if not startup_error:
|
||||
startup_error = 'No PCM samples received within startup timeout'
|
||||
if stderr_lines:
|
||||
startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}'
|
||||
is_last_device = device_pos == len(candidate_device_indices)
|
||||
is_last_attempt = attempt_index == len(direct_sampling_attempts)
|
||||
if (
|
||||
is_last_device
|
||||
and is_last_attempt
|
||||
and rtl_process.poll() is None
|
||||
and multimon_process.poll() is None
|
||||
):
|
||||
startup_ok = True
|
||||
runtime_config['startup_waiting'] = True
|
||||
runtime_config['startup_warning'] = startup_error
|
||||
logger.warning(
|
||||
'Morse startup continuing without PCM on %s: %s',
|
||||
_device_label(active_device_index),
|
||||
startup_error,
|
||||
)
|
||||
_queue_morse_event({
|
||||
'type': 'info',
|
||||
'text': '[morse] waiting for PCM stream...',
|
||||
})
|
||||
|
||||
if startup_ok:
|
||||
runtime_config['direct_sampling_mode'] = direct_sampling_mode
|
||||
runtime_config['direct_sampling'] = (
|
||||
int(direct_sampling_mode) if direct_sampling_mode is not None else 0
|
||||
)
|
||||
runtime_config['command'] = full_cmd
|
||||
runtime_config['active_device'] = active_device_index
|
||||
|
||||
active_rtl_process = rtl_process
|
||||
active_multimon_process = multimon_process
|
||||
active_stop_event = stop_event
|
||||
active_control_queue = control_queue
|
||||
active_decoder_thread = decoder_thread
|
||||
active_stderr_thread = stderr_thread
|
||||
active_relay_thread = relay_thread
|
||||
active_master_fd = master_fd
|
||||
startup_succeeded = True
|
||||
break
|
||||
|
||||
attempt_errors.append(
|
||||
f'{_device_label(active_device_index)} '
|
||||
f'attempt {attempt_index}/{len(direct_sampling_attempts)} '
|
||||
f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}'
|
||||
)
|
||||
logger.warning('Morse startup attempt failed: %s', attempt_errors[-1])
|
||||
_queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'})
|
||||
|
||||
_cleanup_attempt(
|
||||
rtl_process,
|
||||
multimon_process,
|
||||
stop_event,
|
||||
control_queue,
|
||||
runtime_config,
|
||||
pcm_ready_event,
|
||||
),
|
||||
daemon=True,
|
||||
name='morse-relay',
|
||||
)
|
||||
relay_thread.start()
|
||||
|
||||
decoder_thread = threading.Thread(
|
||||
target=_morse_multimon_output_thread,
|
||||
args=(master_fd, multimon_process, stop_event),
|
||||
daemon=True,
|
||||
name='morse-decoder',
|
||||
)
|
||||
decoder_thread.start()
|
||||
|
||||
startup_deadline = time.monotonic() + 4.0
|
||||
startup_ok = False
|
||||
startup_error = ''
|
||||
|
||||
while time.monotonic() < startup_deadline:
|
||||
if pcm_ready_event.is_set():
|
||||
startup_ok = True
|
||||
break
|
||||
if rtl_process.poll() is not None:
|
||||
startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})'
|
||||
break
|
||||
if multimon_process.poll() is not None:
|
||||
startup_error = f'multimon-ng exited during startup (code {multimon_process.returncode})'
|
||||
break
|
||||
time.sleep(0.05)
|
||||
|
||||
if not startup_ok:
|
||||
if not startup_error:
|
||||
startup_error = 'No PCM samples received within startup timeout'
|
||||
if stderr_lines:
|
||||
startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}'
|
||||
is_last_attempt = attempt_index == len(direct_sampling_attempts)
|
||||
if is_last_attempt and rtl_process.poll() is None and multimon_process.poll() is None:
|
||||
startup_ok = True
|
||||
runtime_config['startup_waiting'] = True
|
||||
runtime_config['startup_warning'] = startup_error
|
||||
logger.warning(
|
||||
'Morse startup continuing without PCM (attempt %s/%s): %s',
|
||||
attempt_index,
|
||||
len(direct_sampling_attempts),
|
||||
startup_error,
|
||||
)
|
||||
_queue_morse_event({
|
||||
'type': 'info',
|
||||
'text': '[morse] waiting for PCM stream...',
|
||||
})
|
||||
|
||||
if startup_ok:
|
||||
runtime_config['direct_sampling_mode'] = direct_sampling_mode
|
||||
runtime_config['direct_sampling'] = (
|
||||
int(direct_sampling_mode) if direct_sampling_mode is not None else 0
|
||||
decoder_thread,
|
||||
stderr_thread,
|
||||
relay_thread,
|
||||
master_fd,
|
||||
)
|
||||
runtime_config['command'] = full_cmd
|
||||
rtl_process = None
|
||||
multimon_process = None
|
||||
stop_event = None
|
||||
control_queue = None
|
||||
decoder_thread = None
|
||||
stderr_thread = None
|
||||
relay_thread = None
|
||||
master_fd = None
|
||||
|
||||
active_rtl_process = rtl_process
|
||||
active_multimon_process = multimon_process
|
||||
active_stop_event = stop_event
|
||||
active_control_queue = control_queue
|
||||
active_decoder_thread = decoder_thread
|
||||
active_stderr_thread = stderr_thread
|
||||
active_relay_thread = relay_thread
|
||||
active_master_fd = master_fd
|
||||
if startup_succeeded:
|
||||
break
|
||||
|
||||
attempt_errors.append(
|
||||
f'attempt {attempt_index}/{len(direct_sampling_attempts)} '
|
||||
f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}'
|
||||
)
|
||||
logger.warning('Morse startup attempt failed: %s', attempt_errors[-1])
|
||||
_queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'})
|
||||
|
||||
_cleanup_attempt(
|
||||
rtl_process,
|
||||
multimon_process,
|
||||
stop_event,
|
||||
control_queue,
|
||||
decoder_thread,
|
||||
stderr_thread,
|
||||
relay_thread,
|
||||
master_fd,
|
||||
)
|
||||
rtl_process = None
|
||||
multimon_process = None
|
||||
stop_event = None
|
||||
control_queue = None
|
||||
decoder_thread = None
|
||||
stderr_thread = None
|
||||
relay_thread = None
|
||||
master_fd = None
|
||||
if device_pos < len(candidate_device_indices):
|
||||
next_device = candidate_device_indices[device_pos]
|
||||
_queue_morse_event({
|
||||
'type': 'status',
|
||||
'state': MORSE_STARTING,
|
||||
'status': MORSE_STARTING,
|
||||
'message': (
|
||||
f'No PCM on {_device_label(active_device_index)}. '
|
||||
f'Trying {_device_label(next_device)}...'
|
||||
),
|
||||
'session_id': morse_session_id,
|
||||
'timestamp': time.strftime('%H:%M:%S'),
|
||||
})
|
||||
|
||||
if (
|
||||
active_rtl_process is None
|
||||
@@ -880,7 +965,10 @@ def start_morse() -> Response:
|
||||
or active_relay_thread is None
|
||||
or active_master_fd is None
|
||||
):
|
||||
msg = 'SDR capture started but no PCM stream was received.'
|
||||
msg = (
|
||||
f'SDR capture started but no PCM stream was received from '
|
||||
f'{_device_label(active_device_index)}.'
|
||||
)
|
||||
if attempt_errors:
|
||||
msg += ' ' + ' | '.join(attempt_errors)
|
||||
logger.error('Morse startup failed: %s', msg)
|
||||
|
||||
@@ -269,6 +269,7 @@ class TestMorseLifecycleRoutes:
|
||||
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice()))
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder()))
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'detect_devices', staticmethod(lambda: []))
|
||||
monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng')
|
||||
|
||||
pcm = generate_morse_audio('E', wpm=15, sample_rate=22050)
|
||||
@@ -373,6 +374,7 @@ class TestMorseLifecycleRoutes:
|
||||
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice()))
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder()))
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'detect_devices', staticmethod(lambda: []))
|
||||
monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng')
|
||||
|
||||
pcm = generate_morse_audio('E', wpm=15, sample_rate=22050)
|
||||
@@ -455,6 +457,127 @@ class TestMorseLifecycleRoutes:
|
||||
assert stop_resp.get_json()['status'] == 'stopped'
|
||||
assert 0 in released_devices
|
||||
|
||||
def test_start_falls_back_to_next_device_when_selected_device_has_no_pcm(self, client, monkeypatch):
|
||||
_login_session(client)
|
||||
self._reset_route_state()
|
||||
|
||||
released_devices = []
|
||||
|
||||
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None)
|
||||
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx))
|
||||
monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng')
|
||||
|
||||
class DummyDevice:
|
||||
def __init__(self, index: int):
|
||||
self.sdr_type = morse_routes.SDRType.RTL_SDR
|
||||
self.index = index
|
||||
|
||||
class DummyDetected:
|
||||
def __init__(self, index: int, serial: str):
|
||||
self.sdr_type = morse_routes.SDRType.RTL_SDR
|
||||
self.index = index
|
||||
self.name = f'RTL {index}'
|
||||
self.serial = serial
|
||||
|
||||
class DummyBuilder:
|
||||
def build_fm_demod_command(self, **kwargs):
|
||||
cmd = ['rtl_fm', '-d', str(kwargs['device'].index), '-f', '14.060M', '-M', 'usb', '-s', '22050']
|
||||
if kwargs.get('direct_sampling') is not None:
|
||||
cmd.extend(['--direct', str(kwargs['direct_sampling'])])
|
||||
cmd.append('-')
|
||||
return cmd
|
||||
|
||||
monkeypatch.setattr(
|
||||
morse_routes.SDRFactory,
|
||||
'create_default_device',
|
||||
staticmethod(lambda sdr_type, index: DummyDevice(int(index))),
|
||||
)
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder()))
|
||||
monkeypatch.setattr(
|
||||
morse_routes.SDRFactory,
|
||||
'detect_devices',
|
||||
staticmethod(lambda: [DummyDetected(0, 'AAA00000'), DummyDetected(1, 'BBB11111')]),
|
||||
)
|
||||
|
||||
pcm = generate_morse_audio('E', wpm=15, sample_rate=22050)
|
||||
|
||||
class FakeRtlProc:
|
||||
def __init__(self, stdout_bytes: bytes, returncode: int | None):
|
||||
self.stdout = io.BytesIO(stdout_bytes)
|
||||
self.stderr = io.BytesIO(b'')
|
||||
self.returncode = returncode
|
||||
|
||||
def poll(self):
|
||||
return self.returncode
|
||||
|
||||
def terminate(self):
|
||||
self.returncode = 0
|
||||
|
||||
def wait(self, timeout=None):
|
||||
self.returncode = 0
|
||||
return 0
|
||||
|
||||
def kill(self):
|
||||
self.returncode = -9
|
||||
|
||||
class FakeMultimonProc:
|
||||
def __init__(self):
|
||||
self.stdin = io.BytesIO()
|
||||
self.returncode = None
|
||||
|
||||
def poll(self):
|
||||
return self.returncode
|
||||
|
||||
def terminate(self):
|
||||
self.returncode = 0
|
||||
|
||||
def wait(self, timeout=None):
|
||||
self.returncode = 0
|
||||
return 0
|
||||
|
||||
def kill(self):
|
||||
self.returncode = -9
|
||||
|
||||
def fake_popen(cmd, *args, **kwargs):
|
||||
if 'multimon' in str(cmd[0]):
|
||||
return FakeMultimonProc()
|
||||
try:
|
||||
dev = int(cmd[cmd.index('-d') + 1])
|
||||
except Exception:
|
||||
dev = 0
|
||||
if dev == 0:
|
||||
return FakeRtlProc(b'', 1)
|
||||
return FakeRtlProc(pcm, None)
|
||||
|
||||
monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen)
|
||||
monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None)
|
||||
monkeypatch.setattr(morse_routes, 'unregister_process', lambda _proc: None)
|
||||
monkeypatch.setattr(
|
||||
morse_routes,
|
||||
'safe_terminate',
|
||||
lambda proc, timeout=0.0: setattr(proc, 'returncode', 0),
|
||||
)
|
||||
|
||||
start_resp = client.post('/morse/start', json={
|
||||
'frequency': '14.060',
|
||||
'gain': '20',
|
||||
'ppm': '0',
|
||||
'device': '0',
|
||||
'tone_freq': '700',
|
||||
'wpm': '15',
|
||||
})
|
||||
assert start_resp.status_code == 200
|
||||
start_data = start_resp.get_json()
|
||||
assert start_data['status'] == 'started'
|
||||
assert start_data['config']['active_device'] == 1
|
||||
assert start_data['config']['device_serial'] == 'BBB11111'
|
||||
assert 0 in released_devices
|
||||
|
||||
stop_resp = client.post('/morse/stop')
|
||||
assert stop_resp.status_code == 200
|
||||
assert stop_resp.get_json()['status'] == 'stopped'
|
||||
assert 1 in released_devices
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: synthetic CW -> WAV decode
|
||||
|
||||
Reference in New Issue
Block a user