mirror of
https://github.com/smittix/intercept.git
synced 2026-07-03 07:14:00 -07:00
Fix Morse mode lifecycle stop hangs and rebuild CW decoder
This commit is contained in:
+348
-467
@@ -1,467 +1,348 @@
|
||||
"""Tests for Morse code decoder (utils/morse.py) and routes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import queue
|
||||
import struct
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
|
||||
from utils.morse import (
|
||||
CHAR_TO_MORSE,
|
||||
MORSE_TABLE,
|
||||
GoertzelFilter,
|
||||
MorseDecoder,
|
||||
morse_decoder_thread,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _login_session(client) -> None:
|
||||
"""Mark the Flask test session as authenticated."""
|
||||
with client.session_transaction() as sess:
|
||||
sess['logged_in'] = True
|
||||
sess['username'] = 'test'
|
||||
sess['role'] = 'admin'
|
||||
|
||||
|
||||
def generate_tone(freq: float, duration: float, sample_rate: int = 8000, amplitude: float = 0.8) -> bytes:
|
||||
"""Generate a pure sine wave as 16-bit LE PCM bytes."""
|
||||
n_samples = int(sample_rate * duration)
|
||||
samples = []
|
||||
for i in range(n_samples):
|
||||
t = i / sample_rate
|
||||
val = int(amplitude * 32767 * math.sin(2 * math.pi * freq * t))
|
||||
samples.append(max(-32768, min(32767, val)))
|
||||
return struct.pack(f'<{len(samples)}h', *samples)
|
||||
|
||||
|
||||
def generate_silence(duration: float, sample_rate: int = 8000) -> bytes:
|
||||
"""Generate silence as 16-bit LE PCM bytes."""
|
||||
n_samples = int(sample_rate * duration)
|
||||
return b'\x00\x00' * n_samples
|
||||
|
||||
|
||||
def generate_morse_audio(text: str, wpm: int = 15, tone_freq: float = 700.0, sample_rate: int = 8000) -> bytes:
|
||||
"""Generate PCM audio for a Morse-encoded string."""
|
||||
dit_dur = 1.2 / wpm
|
||||
dah_dur = 3 * dit_dur
|
||||
element_gap = dit_dur
|
||||
char_gap = 3 * dit_dur
|
||||
word_gap = 7 * dit_dur
|
||||
|
||||
audio = b''
|
||||
words = text.upper().split()
|
||||
for wi, word in enumerate(words):
|
||||
for ci, char in enumerate(word):
|
||||
morse = CHAR_TO_MORSE.get(char)
|
||||
if morse is None:
|
||||
continue
|
||||
for ei, element in enumerate(morse):
|
||||
if element == '.':
|
||||
audio += generate_tone(tone_freq, dit_dur, sample_rate)
|
||||
elif element == '-':
|
||||
audio += generate_tone(tone_freq, dah_dur, sample_rate)
|
||||
if ei < len(morse) - 1:
|
||||
audio += generate_silence(element_gap, sample_rate)
|
||||
if ci < len(word) - 1:
|
||||
audio += generate_silence(char_gap, sample_rate)
|
||||
if wi < len(words) - 1:
|
||||
audio += generate_silence(word_gap, sample_rate)
|
||||
|
||||
# Add some leading/trailing silence for threshold settling
|
||||
silence = generate_silence(0.3, sample_rate)
|
||||
return silence + audio + silence
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MORSE_TABLE tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseTable:
|
||||
def test_all_26_letters_present(self):
|
||||
chars = set(MORSE_TABLE.values())
|
||||
for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
|
||||
assert letter in chars, f"Missing letter: {letter}"
|
||||
|
||||
def test_all_10_digits_present(self):
|
||||
chars = set(MORSE_TABLE.values())
|
||||
for digit in '0123456789':
|
||||
assert digit in chars, f"Missing digit: {digit}"
|
||||
|
||||
def test_reverse_lookup_consistent(self):
|
||||
for morse, char in MORSE_TABLE.items():
|
||||
if char in CHAR_TO_MORSE:
|
||||
assert CHAR_TO_MORSE[char] == morse
|
||||
|
||||
def test_no_duplicate_morse_codes(self):
|
||||
"""Each morse pattern should map to exactly one character."""
|
||||
assert len(MORSE_TABLE) == len(set(MORSE_TABLE.keys()))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GoertzelFilter tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGoertzelFilter:
|
||||
def test_detects_target_frequency(self):
|
||||
gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160)
|
||||
# Generate 700 Hz tone
|
||||
samples = [0.8 * math.sin(2 * math.pi * 700 * i / 8000) for i in range(160)]
|
||||
mag = gf.magnitude(samples)
|
||||
assert mag > 10.0, f"Expected high magnitude for target freq, got {mag}"
|
||||
|
||||
def test_rejects_off_frequency(self):
|
||||
gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160)
|
||||
# Generate 1500 Hz tone (well off target)
|
||||
samples = [0.8 * math.sin(2 * math.pi * 1500 * i / 8000) for i in range(160)]
|
||||
mag_off = gf.magnitude(samples)
|
||||
|
||||
# Compare with on-target
|
||||
samples_on = [0.8 * math.sin(2 * math.pi * 700 * i / 8000) for i in range(160)]
|
||||
mag_on = gf.magnitude(samples_on)
|
||||
|
||||
assert mag_on > mag_off * 3, "Target freq should be significantly stronger than off-freq"
|
||||
|
||||
def test_silence_returns_near_zero(self):
|
||||
gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160)
|
||||
samples = [0.0] * 160
|
||||
mag = gf.magnitude(samples)
|
||||
assert mag < 0.01, f"Expected near-zero for silence, got {mag}"
|
||||
|
||||
def test_different_block_sizes(self):
|
||||
for block_size in [80, 160, 320]:
|
||||
gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=block_size)
|
||||
samples = [0.8 * math.sin(2 * math.pi * 700 * i / 8000) for i in range(block_size)]
|
||||
mag = gf.magnitude(samples)
|
||||
assert mag > 5.0, f"Should detect tone with block_size={block_size}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MorseDecoder tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseDecoder:
|
||||
def _make_decoder(self, wpm=15):
|
||||
"""Create decoder with warm-up phase completed for testing.
|
||||
|
||||
Feeds silence then tone then silence to get past the warm-up
|
||||
blocks and establish a valid noise floor / signal peak.
|
||||
"""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=wpm)
|
||||
# Feed enough audio to get past warm-up (50 blocks = 1 sec)
|
||||
# Mix silence and tone so warm-up sees both noise and signal
|
||||
warmup_audio = generate_silence(0.6) + generate_tone(700.0, 0.4) + generate_silence(0.5)
|
||||
decoder.process_block(warmup_audio)
|
||||
# Reset state machine after warm-up so tests start clean
|
||||
decoder._tone_on = False
|
||||
decoder._current_symbol = ''
|
||||
decoder._tone_blocks = 0
|
||||
decoder._silence_blocks = 0
|
||||
return decoder
|
||||
|
||||
def test_dit_detection(self):
|
||||
"""A single dit should produce a '.' in the symbol buffer."""
|
||||
decoder = self._make_decoder()
|
||||
dit_dur = 1.2 / 15
|
||||
|
||||
# Send a tone burst (dit)
|
||||
tone = generate_tone(700.0, dit_dur)
|
||||
decoder.process_block(tone)
|
||||
|
||||
# Send silence to trigger end of tone
|
||||
silence = generate_silence(dit_dur * 2)
|
||||
decoder.process_block(silence)
|
||||
|
||||
# Symbol buffer should have a dot
|
||||
assert '.' in decoder._current_symbol, f"Expected '.' in symbol, got '{decoder._current_symbol}'"
|
||||
|
||||
def test_dah_detection(self):
|
||||
"""A longer tone should produce a '-' in the symbol buffer."""
|
||||
decoder = self._make_decoder()
|
||||
dah_dur = 3 * 1.2 / 15
|
||||
|
||||
tone = generate_tone(700.0, dah_dur)
|
||||
decoder.process_block(tone)
|
||||
|
||||
silence = generate_silence(dah_dur)
|
||||
decoder.process_block(silence)
|
||||
|
||||
assert '-' in decoder._current_symbol, f"Expected '-' in symbol, got '{decoder._current_symbol}'"
|
||||
|
||||
def test_decode_letter_e(self):
|
||||
"""E is a single dit - the simplest character."""
|
||||
decoder = self._make_decoder()
|
||||
audio = generate_morse_audio('E', wpm=15)
|
||||
events = decoder.process_block(audio)
|
||||
events.extend(decoder.flush())
|
||||
|
||||
chars = [e for e in events if e['type'] == 'morse_char']
|
||||
decoded = ''.join(e['char'] for e in chars)
|
||||
assert 'E' in decoded, f"Expected 'E' in decoded text, got '{decoded}'"
|
||||
|
||||
def test_decode_letter_t(self):
|
||||
"""T is a single dah."""
|
||||
decoder = self._make_decoder()
|
||||
audio = generate_morse_audio('T', wpm=15)
|
||||
events = decoder.process_block(audio)
|
||||
events.extend(decoder.flush())
|
||||
|
||||
chars = [e for e in events if e['type'] == 'morse_char']
|
||||
decoded = ''.join(e['char'] for e in chars)
|
||||
assert 'T' in decoded, f"Expected 'T' in decoded text, got '{decoded}'"
|
||||
|
||||
def test_word_space_detection(self):
|
||||
"""A long silence between words should produce decoded chars with a space."""
|
||||
decoder = self._make_decoder()
|
||||
dit_dur = 1.2 / 15
|
||||
# E = dit
|
||||
audio = generate_tone(700.0, dit_dur) + generate_silence(7 * dit_dur * 1.5)
|
||||
# T = dah
|
||||
audio += generate_tone(700.0, 3 * dit_dur) + generate_silence(3 * dit_dur)
|
||||
events = decoder.process_block(audio)
|
||||
events.extend(decoder.flush())
|
||||
|
||||
spaces = [e for e in events if e['type'] == 'morse_space']
|
||||
assert len(spaces) >= 1, "Expected at least one word space"
|
||||
|
||||
def test_scope_events_generated(self):
|
||||
"""Decoder should produce scope events for visualization."""
|
||||
audio = generate_morse_audio('SOS', wpm=15)
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
|
||||
events = decoder.process_block(audio)
|
||||
|
||||
scope_events = [e for e in events if e['type'] == 'scope']
|
||||
assert len(scope_events) > 0, "Expected scope events"
|
||||
# Check scope event structure
|
||||
se = scope_events[0]
|
||||
assert 'amplitudes' in se
|
||||
assert 'threshold' in se
|
||||
assert 'tone_on' in se
|
||||
|
||||
def test_adaptive_threshold_adjusts(self):
|
||||
"""After processing enough audio to complete warm-up, threshold should be non-zero."""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
|
||||
# Feed enough audio to complete the 50-block warm-up (~1 second)
|
||||
audio = generate_silence(0.6) + generate_tone(700.0, 0.4) + generate_silence(0.3)
|
||||
decoder.process_block(audio)
|
||||
|
||||
assert decoder._threshold > 0, "Threshold should adapt above zero after warm-up"
|
||||
|
||||
def test_flush_emits_pending_char(self):
|
||||
"""flush() should emit any accumulated but not-yet-decoded symbol."""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
decoder._current_symbol = '.' # Manually set pending dit
|
||||
events = decoder.flush()
|
||||
assert len(events) == 1
|
||||
assert events[0]['type'] == 'morse_char'
|
||||
assert events[0]['char'] == 'E'
|
||||
|
||||
def test_flush_empty_returns_nothing(self):
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
events = decoder.flush()
|
||||
assert events == []
|
||||
|
||||
def test_weak_signal_detection(self):
|
||||
"""CW tone at only 3x noise magnitude should still decode characters."""
|
||||
decoder = self._make_decoder(wpm=10)
|
||||
# Generate weak CW audio (low amplitude simulating weak HF signal)
|
||||
audio = generate_morse_audio('SOS', wpm=10, sample_rate=8000)
|
||||
# Scale to low amplitude (simulating weak signal)
|
||||
n_samples = len(audio) // 2
|
||||
samples = struct.unpack(f'<{n_samples}h', audio)
|
||||
# Reduce to ~10% amplitude
|
||||
weak_samples = [max(-32768, min(32767, int(s * 0.1))) for s in samples]
|
||||
weak_audio = struct.pack(f'<{len(weak_samples)}h', *weak_samples)
|
||||
|
||||
events = decoder.process_block(weak_audio)
|
||||
events.extend(decoder.flush())
|
||||
|
||||
chars = [e for e in events if e['type'] == 'morse_char']
|
||||
decoded = ''.join(e['char'] for e in chars)
|
||||
# Should decode at least some characters from the weak signal
|
||||
assert len(chars) >= 1, f"Expected decoded chars from weak signal, got '{decoded}'"
|
||||
|
||||
def test_agc_boosts_quiet_signal(self):
|
||||
"""Very quiet PCM (amplitude 0.01) should still produce usable Goertzel magnitudes."""
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
# Generate very quiet tone
|
||||
quiet_tone = generate_tone(700.0, 1.5, amplitude=0.01) # 1.5s of very quiet CW
|
||||
events = decoder.process_block(quiet_tone)
|
||||
|
||||
scope_events = [e for e in events if e['type'] == 'scope']
|
||||
assert len(scope_events) > 0, "Expected scope events from quiet signal"
|
||||
# AGC should have boosted the signal — amplitudes should be visible
|
||||
max_amp = max(max(se['amplitudes']) for se in scope_events)
|
||||
assert max_amp > 1.0, f"AGC should boost quiet signal to usable magnitude, got {max_amp}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# morse_decoder_thread tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseDecoderThread:
|
||||
def test_thread_stops_on_event(self):
|
||||
"""Thread should exit when stop_event is set."""
|
||||
import io
|
||||
# Create a fake stdout that blocks until stop
|
||||
stop = threading.Event()
|
||||
q = queue.Queue(maxsize=100)
|
||||
|
||||
# Feed some audio then close
|
||||
audio = generate_morse_audio('E', wpm=15)
|
||||
fake_stdout = io.BytesIO(audio)
|
||||
|
||||
t = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(fake_stdout, q, stop),
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
t.join(timeout=5)
|
||||
assert not t.is_alive(), "Thread should finish after reading all data"
|
||||
|
||||
def test_thread_heartbeat_on_no_data(self):
|
||||
"""When rtl_fm produces no data, thread should emit waiting scope events."""
|
||||
import os as _os
|
||||
stop = threading.Event()
|
||||
q = queue.Queue(maxsize=100)
|
||||
|
||||
# Create a pipe that never gets written to (simulates rtl_fm with no output)
|
||||
read_fd, write_fd = _os.pipe()
|
||||
read_file = _os.fdopen(read_fd, 'rb', 0)
|
||||
|
||||
t = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(read_file, q, stop),
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
# Wait up to 5 seconds for at least one heartbeat event
|
||||
events = []
|
||||
import time as _time
|
||||
deadline = _time.monotonic() + 5.0
|
||||
while _time.monotonic() < deadline:
|
||||
try:
|
||||
ev = q.get(timeout=0.5)
|
||||
events.append(ev)
|
||||
if ev.get('waiting'):
|
||||
break
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
stop.set()
|
||||
_os.close(write_fd)
|
||||
read_file.close()
|
||||
t.join(timeout=3)
|
||||
|
||||
waiting_events = [e for e in events if e.get('type') == 'scope' and e.get('waiting')]
|
||||
assert len(waiting_events) >= 1, f"Expected waiting heartbeat events, got {events}"
|
||||
ev = waiting_events[0]
|
||||
assert ev['amplitudes'] == []
|
||||
assert ev['threshold'] == 0
|
||||
assert ev['tone_on'] is False
|
||||
|
||||
def test_thread_produces_events(self):
|
||||
"""Thread should push character events to the queue."""
|
||||
import io
|
||||
from unittest.mock import patch
|
||||
stop = threading.Event()
|
||||
q = queue.Queue(maxsize=1000)
|
||||
|
||||
# Generate audio with pre-warmed decoder in mind
|
||||
# The thread creates a fresh decoder, so generate lots of audio
|
||||
audio = generate_silence(0.5) + generate_morse_audio('SOS', wpm=10) + generate_silence(1.0)
|
||||
fake_stdout = io.BytesIO(audio)
|
||||
|
||||
# Patch SCOPE_INTERVAL to 0 so scope events aren't throttled in fast reads
|
||||
with patch('utils.morse.time') as mock_time:
|
||||
# Make monotonic() always return increasing values
|
||||
counter = [0.0]
|
||||
def fake_monotonic():
|
||||
counter[0] += 0.15 # each call advances 150ms
|
||||
return counter[0]
|
||||
mock_time.monotonic = fake_monotonic
|
||||
|
||||
t = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(fake_stdout, q, stop),
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
t.join(timeout=10)
|
||||
|
||||
events = []
|
||||
while not q.empty():
|
||||
events.append(q.get_nowait())
|
||||
|
||||
# Should have at least some events (scope or char)
|
||||
assert len(events) > 0, "Expected events from thread"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Route tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseRoutes:
|
||||
def test_start_missing_required_fields(self, client):
|
||||
"""Start should succeed with defaults."""
|
||||
_login_session(client)
|
||||
with pytest.MonkeyPatch.context() as m:
|
||||
m.setattr('app.morse_process', None)
|
||||
# Should fail because rtl_fm won't be found in test env
|
||||
resp = client.post('/morse/start', json={'frequency': '14.060'})
|
||||
assert resp.status_code in (200, 400, 409, 500)
|
||||
|
||||
def test_stop_when_not_running(self, client):
|
||||
"""Stop when nothing is running should return not_running."""
|
||||
_login_session(client)
|
||||
with pytest.MonkeyPatch.context() as m:
|
||||
m.setattr('app.morse_process', None)
|
||||
resp = client.post('/morse/stop')
|
||||
data = resp.get_json()
|
||||
assert data['status'] == 'not_running'
|
||||
|
||||
def test_status_when_not_running(self, client):
|
||||
"""Status should report not running."""
|
||||
_login_session(client)
|
||||
with pytest.MonkeyPatch.context() as m:
|
||||
m.setattr('app.morse_process', None)
|
||||
resp = client.get('/morse/status')
|
||||
data = resp.get_json()
|
||||
assert data['running'] is False
|
||||
|
||||
def test_invalid_tone_freq(self, client):
|
||||
"""Tone frequency outside range should be rejected."""
|
||||
_login_session(client)
|
||||
with pytest.MonkeyPatch.context() as m:
|
||||
m.setattr('app.morse_process', None)
|
||||
resp = client.post('/morse/start', json={
|
||||
'frequency': '14.060',
|
||||
'tone_freq': '50', # too low
|
||||
})
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_invalid_wpm(self, client):
|
||||
"""WPM outside range should be rejected."""
|
||||
_login_session(client)
|
||||
with pytest.MonkeyPatch.context() as m:
|
||||
m.setattr('app.morse_process', None)
|
||||
resp = client.post('/morse/start', json={
|
||||
'frequency': '14.060',
|
||||
'wpm': '100', # too high
|
||||
})
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_stream_endpoint_exists(self, client):
|
||||
"""Stream endpoint should return SSE content type."""
|
||||
_login_session(client)
|
||||
resp = client.get('/morse/stream')
|
||||
assert resp.content_type.startswith('text/event-stream')
|
||||
"""Tests for Morse code decoder pipeline and lifecycle routes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import math
|
||||
import os
|
||||
import queue
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
import wave
|
||||
from collections import Counter
|
||||
|
||||
import pytest
|
||||
|
||||
import app as app_module
|
||||
import routes.morse as morse_routes
|
||||
from utils.morse import (
|
||||
CHAR_TO_MORSE,
|
||||
MORSE_TABLE,
|
||||
GoertzelFilter,
|
||||
MorseDecoder,
|
||||
decode_morse_wav_file,
|
||||
morse_decoder_thread,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _login_session(client) -> None:
|
||||
"""Mark the Flask test session as authenticated."""
|
||||
with client.session_transaction() as sess:
|
||||
sess['logged_in'] = True
|
||||
sess['username'] = 'test'
|
||||
sess['role'] = 'admin'
|
||||
|
||||
|
||||
def generate_tone(freq: float, duration: float, sample_rate: int = 8000, amplitude: float = 0.8) -> bytes:
|
||||
"""Generate a pure sine wave as 16-bit LE PCM bytes."""
|
||||
n_samples = int(sample_rate * duration)
|
||||
samples = []
|
||||
for i in range(n_samples):
|
||||
t = i / sample_rate
|
||||
val = int(amplitude * 32767 * math.sin(2 * math.pi * freq * t))
|
||||
samples.append(max(-32768, min(32767, val)))
|
||||
return struct.pack(f'<{len(samples)}h', *samples)
|
||||
|
||||
|
||||
def generate_silence(duration: float, sample_rate: int = 8000) -> bytes:
|
||||
"""Generate silence as 16-bit LE PCM bytes."""
|
||||
n_samples = int(sample_rate * duration)
|
||||
return b'\x00\x00' * n_samples
|
||||
|
||||
|
||||
def generate_morse_audio(text: str, wpm: int = 15, tone_freq: float = 700.0, sample_rate: int = 8000) -> bytes:
|
||||
"""Generate synthetic CW PCM for the given text."""
|
||||
dit_dur = 1.2 / wpm
|
||||
dah_dur = 3 * dit_dur
|
||||
element_gap = dit_dur
|
||||
char_gap = 3 * dit_dur
|
||||
word_gap = 7 * dit_dur
|
||||
|
||||
audio = b''
|
||||
words = text.upper().split()
|
||||
for wi, word in enumerate(words):
|
||||
for ci, char in enumerate(word):
|
||||
morse = CHAR_TO_MORSE.get(char)
|
||||
if morse is None:
|
||||
continue
|
||||
|
||||
for ei, element in enumerate(morse):
|
||||
if element == '.':
|
||||
audio += generate_tone(tone_freq, dit_dur, sample_rate)
|
||||
elif element == '-':
|
||||
audio += generate_tone(tone_freq, dah_dur, sample_rate)
|
||||
|
||||
if ei < len(morse) - 1:
|
||||
audio += generate_silence(element_gap, sample_rate)
|
||||
|
||||
if ci < len(word) - 1:
|
||||
audio += generate_silence(char_gap, sample_rate)
|
||||
|
||||
if wi < len(words) - 1:
|
||||
audio += generate_silence(word_gap, sample_rate)
|
||||
|
||||
# Leading/trailing silence for threshold settling.
|
||||
return generate_silence(0.3, sample_rate) + audio + generate_silence(0.3, sample_rate)
|
||||
|
||||
|
||||
def write_wav(path, pcm_bytes: bytes, sample_rate: int = 8000) -> None:
|
||||
"""Write mono 16-bit PCM bytes to a WAV file."""
|
||||
with wave.open(str(path), 'wb') as wf:
|
||||
wf.setnchannels(1)
|
||||
wf.setsampwidth(2)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(pcm_bytes)
|
||||
|
||||
|
||||
def decode_text_from_events(events) -> str:
|
||||
out = []
|
||||
for ev in events:
|
||||
if ev.get('type') == 'morse_char':
|
||||
out.append(str(ev.get('char', '')))
|
||||
elif ev.get('type') == 'morse_space':
|
||||
out.append(' ')
|
||||
return ''.join(out)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseTable:
|
||||
def test_morse_table_contains_letters_and_digits(self):
|
||||
chars = set(MORSE_TABLE.values())
|
||||
for ch in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789':
|
||||
assert ch in chars
|
||||
|
||||
def test_round_trip_morse_lookup(self):
|
||||
for morse, char in MORSE_TABLE.items():
|
||||
if char in CHAR_TO_MORSE:
|
||||
assert CHAR_TO_MORSE[char] == morse
|
||||
|
||||
|
||||
class TestToneDetector:
|
||||
def test_goertzel_prefers_target_frequency(self):
|
||||
gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160)
|
||||
on_tone = [0.8 * math.sin(2 * math.pi * 700.0 * i / 8000.0) for i in range(160)]
|
||||
off_tone = [0.8 * math.sin(2 * math.pi * 1500.0 * i / 8000.0) for i in range(160)]
|
||||
assert gf.magnitude(on_tone) > gf.magnitude(off_tone) * 3.0
|
||||
|
||||
|
||||
class TestTimingAndWpmEstimator:
|
||||
def test_timing_classifier_distinguishes_dit_and_dah(self):
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15)
|
||||
dit = 1.2 / 15.0
|
||||
dah = dit * 3.0
|
||||
|
||||
audio = (
|
||||
generate_silence(0.35)
|
||||
+ generate_tone(700.0, dit)
|
||||
+ generate_silence(dit * 1.5)
|
||||
+ generate_tone(700.0, dah)
|
||||
+ generate_silence(0.35)
|
||||
)
|
||||
|
||||
events = decoder.process_block(audio)
|
||||
events.extend(decoder.flush())
|
||||
elements = [e['element'] for e in events if e.get('type') == 'morse_element']
|
||||
|
||||
assert '.' in elements
|
||||
assert '-' in elements
|
||||
|
||||
def test_wpm_estimator_sanity(self):
|
||||
target_wpm = 18
|
||||
audio = generate_morse_audio('PARIS PARIS PARIS', wpm=target_wpm)
|
||||
decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=12, wpm_mode='auto')
|
||||
|
||||
events = decoder.process_block(audio)
|
||||
events.extend(decoder.flush())
|
||||
|
||||
metrics = decoder.get_metrics()
|
||||
assert metrics['wpm'] >= 12.0
|
||||
assert metrics['wpm'] <= 24.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Decoder thread tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseDecoderThread:
|
||||
def test_thread_emits_waiting_heartbeat_on_no_data(self):
|
||||
stop_event = threading.Event()
|
||||
output_queue = queue.Queue(maxsize=64)
|
||||
|
||||
read_fd, write_fd = os.pipe()
|
||||
read_file = os.fdopen(read_fd, 'rb', 0)
|
||||
|
||||
worker = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(read_file, output_queue, stop_event),
|
||||
daemon=True,
|
||||
)
|
||||
worker.start()
|
||||
|
||||
got_waiting = False
|
||||
deadline = time.monotonic() + 3.5
|
||||
while time.monotonic() < deadline:
|
||||
try:
|
||||
msg = output_queue.get(timeout=0.3)
|
||||
except queue.Empty:
|
||||
continue
|
||||
if msg.get('type') == 'scope' and msg.get('waiting'):
|
||||
got_waiting = True
|
||||
break
|
||||
|
||||
stop_event.set()
|
||||
os.close(write_fd)
|
||||
read_file.close()
|
||||
worker.join(timeout=2.0)
|
||||
|
||||
assert got_waiting is True
|
||||
assert not worker.is_alive()
|
||||
|
||||
def test_thread_produces_character_events(self):
|
||||
stop_event = threading.Event()
|
||||
output_queue = queue.Queue(maxsize=512)
|
||||
audio = generate_morse_audio('SOS', wpm=15)
|
||||
|
||||
worker = threading.Thread(
|
||||
target=morse_decoder_thread,
|
||||
args=(io.BytesIO(audio), output_queue, stop_event),
|
||||
daemon=True,
|
||||
)
|
||||
worker.start()
|
||||
worker.join(timeout=4.0)
|
||||
|
||||
events = []
|
||||
while not output_queue.empty():
|
||||
events.append(output_queue.get_nowait())
|
||||
|
||||
chars = [e for e in events if e.get('type') == 'morse_char']
|
||||
assert len(chars) >= 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Route lifecycle regression
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseLifecycleRoutes:
|
||||
def _reset_route_state(self):
|
||||
with app_module.morse_lock:
|
||||
app_module.morse_process = None
|
||||
while not app_module.morse_queue.empty():
|
||||
try:
|
||||
app_module.morse_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
morse_routes.morse_active_device = None
|
||||
morse_routes.morse_decoder_worker = None
|
||||
morse_routes.morse_stderr_worker = None
|
||||
morse_routes.morse_stop_event = None
|
||||
morse_routes.morse_control_queue = None
|
||||
morse_routes.morse_runtime_config = {}
|
||||
morse_routes.morse_last_error = ''
|
||||
morse_routes.morse_state = morse_routes.MORSE_IDLE
|
||||
morse_routes.morse_state_message = 'Idle'
|
||||
|
||||
def test_start_stop_reaches_idle_and_releases_resources(self, client, monkeypatch):
|
||||
_login_session(client)
|
||||
self._reset_route_state()
|
||||
|
||||
released_devices = []
|
||||
|
||||
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None)
|
||||
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx))
|
||||
|
||||
class DummyDevice:
|
||||
sdr_type = morse_routes.SDRType.RTL_SDR
|
||||
|
||||
class DummyBuilder:
|
||||
def build_fm_demod_command(self, **kwargs):
|
||||
return ['rtl_fm', '-f', '14060000']
|
||||
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice()))
|
||||
monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder()))
|
||||
monkeypatch.setattr(morse_routes.time, 'sleep', lambda _secs: None)
|
||||
|
||||
pcm = generate_morse_audio('E', wpm=15)
|
||||
|
||||
class FakeProc:
|
||||
def __init__(self):
|
||||
self.stdout = io.BytesIO(pcm)
|
||||
self.stderr = io.BytesIO(b'')
|
||||
self.returncode = None
|
||||
|
||||
def poll(self):
|
||||
return self.returncode
|
||||
|
||||
monkeypatch.setattr(morse_routes.subprocess, 'Popen', lambda *args, **kwargs: FakeProc())
|
||||
monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None)
|
||||
monkeypatch.setattr(morse_routes, 'unregister_process', lambda _proc: None)
|
||||
monkeypatch.setattr(
|
||||
morse_routes,
|
||||
'safe_terminate',
|
||||
lambda proc, timeout=0.0: setattr(proc, 'returncode', 0),
|
||||
)
|
||||
|
||||
start_resp = client.post('/morse/start', json={
|
||||
'frequency': '14.060',
|
||||
'gain': '20',
|
||||
'ppm': '0',
|
||||
'device': '0',
|
||||
'tone_freq': '700',
|
||||
'wpm': '15',
|
||||
})
|
||||
assert start_resp.status_code == 200
|
||||
assert start_resp.get_json()['status'] == 'started'
|
||||
|
||||
status_resp = client.get('/morse/status')
|
||||
assert status_resp.status_code == 200
|
||||
assert status_resp.get_json()['state'] in {'running', 'starting', 'stopping', 'idle'}
|
||||
|
||||
stop_resp = client.post('/morse/stop')
|
||||
assert stop_resp.status_code == 200
|
||||
stop_data = stop_resp.get_json()
|
||||
assert stop_data['status'] == 'stopped'
|
||||
assert stop_data['state'] == 'idle'
|
||||
assert stop_data['alive'] == []
|
||||
|
||||
final_status = client.get('/morse/status').get_json()
|
||||
assert final_status['running'] is False
|
||||
assert final_status['state'] == 'idle'
|
||||
assert 0 in released_devices
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: synthetic CW -> WAV decode
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMorseIntegration:
|
||||
def test_decode_morse_wav_contains_expected_phrase(self, tmp_path):
|
||||
wav_path = tmp_path / 'cq_test_123.wav'
|
||||
pcm = generate_morse_audio('CQ TEST 123', wpm=15, tone_freq=700.0)
|
||||
write_wav(wav_path, pcm, sample_rate=8000)
|
||||
|
||||
result = decode_morse_wav_file(
|
||||
wav_path,
|
||||
sample_rate=8000,
|
||||
tone_freq=700.0,
|
||||
wpm=15,
|
||||
bandwidth_hz=200,
|
||||
auto_tone_track=True,
|
||||
threshold_mode='auto',
|
||||
wpm_mode='auto',
|
||||
min_signal_gate=0.0,
|
||||
)
|
||||
|
||||
decoded = ' '.join(str(result.get('text', '')).split())
|
||||
assert 'CQ TEST 123' in decoded
|
||||
|
||||
events = result.get('events', [])
|
||||
event_counts = Counter(e.get('type') for e in events)
|
||||
assert event_counts['morse_char'] >= len('CQTEST123')
|
||||
|
||||
Reference in New Issue
Block a user