Files
intercept/utils/morse.py
Smittix 9622a00ea1 Fix Morse reader to bypass BufferedReader via os.read on raw fd
BufferedReader.read(n) on non-interactive streams (Python 3.14) blocks
until the full n bytes accumulate, starving the decoder of real-time
PCM data. Use os.read() on the raw file descriptor instead, which
returns as soon as any data is available. Falls back to .read() for
file-like objects without fileno() (e.g. BytesIO in tests).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 10:00:38 +00:00

373 lines
14 KiB
Python

"""Morse code (CW) decoder using Goertzel tone detection.
Signal chain: rtl_fm -M usb → raw PCM → Goertzel filter → timing state machine → characters.
"""
from __future__ import annotations
import contextlib
import math
import os
import queue
import struct
import threading
import time
from datetime import datetime
from typing import Any
# International Morse Code table
MORSE_TABLE: dict[str, str] = {
'.-': 'A', '-...': 'B', '-.-.': 'C', '-..': 'D', '.': 'E',
'..-.': 'F', '--.': 'G', '....': 'H', '..': 'I', '.---': 'J',
'-.-': 'K', '.-..': 'L', '--': 'M', '-.': 'N', '---': 'O',
'.--.': 'P', '--.-': 'Q', '.-.': 'R', '...': 'S', '-': 'T',
'..-': 'U', '...-': 'V', '.--': 'W', '-..-': 'X', '-.--': 'Y',
'--..': 'Z',
'-----': '0', '.----': '1', '..---': '2', '...--': '3',
'....-': '4', '.....': '5', '-....': '6', '--...': '7',
'---..': '8', '----.': '9',
'.-.-.-': '.', '--..--': ',', '..--..': '?', '.----.': "'",
'-.-.--': '!', '-..-.': '/', '-.--.': '(', '-.--.-': ')',
'.-...': '&', '---...': ':', '-.-.-.': ';', '-...-': '=',
'.-.-.': '+', '-....-': '-', '..--.-': '_', '.-..-.': '"',
'...-..-': '$', '.--.-.': '@',
# Prosigns (unique codes only; -...- and -.--.- already mapped above)
'-.-.-': '<CT>', '.-.-': '<AA>', '...-.-': '<SK>',
}
# Reverse lookup: character → morse notation
CHAR_TO_MORSE: dict[str, str] = {v: k for k, v in MORSE_TABLE.items()}
class GoertzelFilter:
"""Single-frequency tone detector using the Goertzel algorithm.
O(N) per block, much cheaper than FFT for detecting one frequency.
"""
def __init__(self, target_freq: float, sample_rate: int, block_size: int):
self.target_freq = target_freq
self.sample_rate = sample_rate
self.block_size = block_size
# Precompute coefficient
k = round(target_freq * block_size / sample_rate)
omega = 2.0 * math.pi * k / block_size
self.coeff = 2.0 * math.cos(omega)
def magnitude(self, samples: list[float] | tuple[float, ...]) -> float:
"""Compute magnitude of the target frequency in the sample block."""
s0 = 0.0
s1 = 0.0
s2 = 0.0
coeff = self.coeff
for sample in samples:
s0 = sample + coeff * s1 - s2
s2 = s1
s1 = s0
return math.sqrt(s1 * s1 + s2 * s2 - coeff * s1 * s2)
class MorseDecoder:
"""Real-time Morse decoder with adaptive threshold.
Processes blocks of PCM audio and emits decoded characters.
Timing based on PARIS standard: dit = 1.2/WPM seconds.
"""
def __init__(
self,
sample_rate: int = 8000,
tone_freq: float = 700.0,
wpm: int = 15,
):
self.sample_rate = sample_rate
self.tone_freq = tone_freq
self.wpm = wpm
# Goertzel filter: ~50 blocks/sec at 8kHz
self._block_size = sample_rate // 50
self._filter = GoertzelFilter(tone_freq, sample_rate, self._block_size)
self._block_duration = self._block_size / sample_rate # seconds per block
# Timing thresholds (in blocks, converted from seconds)
dit_sec = 1.2 / wpm
self._dah_threshold = 2.0 * dit_sec / self._block_duration # blocks
self._dit_min = 0.3 * dit_sec / self._block_duration # min blocks for dit
self._char_gap = 3.0 * dit_sec / self._block_duration # blocks
self._word_gap = 7.0 * dit_sec / self._block_duration # blocks
# AGC (automatic gain control) for direct sampling / weak signals
self._agc_target = 0.3 # target RMS amplitude (0-1 range)
self._agc_gain = 1.0 # current AGC multiplier
self._agc_alpha = 0.05 # EMA smoothing for gain changes
# Warm-up phase constants
self._WARMUP_BLOCKS = 50 # ~1 second at 50 blocks/sec
self._SETTLE_BLOCKS = 200 # blocks for fast→slow EMA transition
self._mag_min = float('inf')
self._mag_max = 0.0
# Adaptive threshold via EMA
self._noise_floor = 0.0
self._signal_peak = 0.0
self._threshold = 0.0
# State machine (counts in blocks, not wall-clock time)
self._tone_on = False
self._tone_blocks = 0 # blocks since tone started
self._silence_blocks = 0 # blocks since silence started
self._current_symbol = '' # accumulates dits/dahs for current char
self._pending_buffer: list[float] = []
self._blocks_processed = 0 # total blocks for warm-up tracking
def process_block(self, pcm_bytes: bytes) -> list[dict[str, Any]]:
"""Process a chunk of 16-bit LE PCM and return decoded events.
Returns list of event dicts with keys:
type: 'scope' | 'morse_char' | 'morse_space'
+ type-specific fields
"""
events: list[dict[str, Any]] = []
# Unpack PCM samples
n_samples = len(pcm_bytes) // 2
if n_samples == 0:
return events
samples = struct.unpack(f'<{n_samples}h', pcm_bytes[:n_samples * 2])
# Feed samples into pending buffer and process in blocks
self._pending_buffer.extend(samples)
amplitudes: list[float] = []
while len(self._pending_buffer) >= self._block_size:
block = self._pending_buffer[:self._block_size]
self._pending_buffer = self._pending_buffer[self._block_size:]
# Normalize to [-1, 1]
normalized = [s / 32768.0 for s in block]
# AGC: boost quiet signals (e.g. direct sampling mode)
rms = math.sqrt(sum(s * s for s in normalized) / len(normalized))
if rms > 1e-6:
desired_gain = self._agc_target / rms
self._agc_gain += self._agc_alpha * (desired_gain - self._agc_gain)
self._agc_gain = min(self._agc_gain, 500.0) # cap to prevent runaway
normalized = [s * self._agc_gain for s in normalized]
mag = self._filter.magnitude(normalized)
amplitudes.append(mag)
self._blocks_processed += 1
# Warm-up phase: collect statistics, suppress detection
if self._blocks_processed <= self._WARMUP_BLOCKS:
self._mag_min = min(self._mag_min, mag)
self._mag_max = max(self._mag_max, mag)
if self._blocks_processed == self._WARMUP_BLOCKS:
# Seed thresholds from observed range
self._noise_floor = self._mag_min
self._signal_peak = max(self._mag_max, self._mag_min * 2)
self._threshold = self._noise_floor + 0.3 * (
self._signal_peak - self._noise_floor
)
tone_detected = False
else:
# Adaptive EMA: fast initially, slow in steady state
alpha = 0.3 if self._blocks_processed < self._WARMUP_BLOCKS + self._SETTLE_BLOCKS else 0.05
if mag < self._threshold:
self._noise_floor += alpha * (mag - self._noise_floor)
else:
self._signal_peak += alpha * (mag - self._signal_peak)
# Threshold at 30% between noise and signal (sensitive to weak CW)
self._threshold = self._noise_floor + 0.3 * (
self._signal_peak - self._noise_floor
)
tone_detected = mag > self._threshold and self._threshold > 0
if tone_detected and not self._tone_on:
# Tone just started - check silence duration for gaps
self._tone_on = True
silence_count = self._silence_blocks
self._tone_blocks = 0
if self._current_symbol and silence_count >= self._char_gap:
# Character gap - decode accumulated symbol
char = MORSE_TABLE.get(self._current_symbol)
if char:
events.append({
'type': 'morse_char',
'char': char,
'morse': self._current_symbol,
'timestamp': datetime.now().strftime('%H:%M:%S'),
})
if silence_count >= self._word_gap:
events.append({
'type': 'morse_space',
'timestamp': datetime.now().strftime('%H:%M:%S'),
})
self._current_symbol = ''
elif not tone_detected and self._tone_on:
# Tone just ended - classify as dit or dah
self._tone_on = False
tone_count = self._tone_blocks
self._silence_blocks = 0
if tone_count >= self._dah_threshold:
self._current_symbol += '-'
elif tone_count >= self._dit_min:
self._current_symbol += '.'
elif tone_detected and self._tone_on:
self._tone_blocks += 1
elif not tone_detected and not self._tone_on:
self._silence_blocks += 1
# Emit scope data for visualization (~10 Hz is handled by caller)
if amplitudes:
events.append({
'type': 'scope',
'amplitudes': amplitudes,
'threshold': self._threshold,
'tone_on': self._tone_on,
})
return events
def flush(self) -> list[dict[str, Any]]:
"""Flush any pending symbol at end of stream."""
events: list[dict[str, Any]] = []
if self._current_symbol:
char = MORSE_TABLE.get(self._current_symbol)
if char:
events.append({
'type': 'morse_char',
'char': char,
'morse': self._current_symbol,
'timestamp': datetime.now().strftime('%H:%M:%S'),
})
self._current_symbol = ''
return events
def _stdout_reader(stdout, data_queue: queue.Queue, stop_event: threading.Event) -> None:
"""Blocking reader — pushes raw PCM chunks to queue, None on EOF.
Uses os.read() on the raw fd when available to bypass BufferedReader,
which on Python 3.14 may block trying to fill its entire buffer before
returning. Falls back to .read() for objects without fileno() (tests).
"""
try:
fd = stdout.fileno()
except Exception:
fd = None
try:
while not stop_event.is_set():
if fd is not None:
data = os.read(fd, 4096)
else:
data = stdout.read(4096)
if not data:
break
data_queue.put(data)
except Exception:
pass
finally:
data_queue.put(None) # sentinel: pipe closed / EOF
def morse_decoder_thread(
rtl_stdout,
output_queue: queue.Queue,
stop_event: threading.Event,
sample_rate: int = 8000,
tone_freq: float = 700.0,
wpm: int = 15,
) -> None:
"""Thread function: reads PCM from rtl_fm, decodes Morse, pushes to queue.
Reads raw 16-bit LE PCM from *rtl_stdout* and feeds it through the
MorseDecoder, pushing scope and character events onto *output_queue*.
"""
import logging
logger = logging.getLogger('intercept.morse')
CHUNK = 4096 # bytes per read (2048 samples at 16-bit mono)
SCOPE_INTERVAL = 0.1 # scope updates at ~10 Hz
last_scope = time.monotonic()
waiting_since: float | None = None
decoder = MorseDecoder(
sample_rate=sample_rate,
tone_freq=tone_freq,
wpm=wpm,
)
try:
pcm_queue: queue.Queue = queue.Queue(maxsize=50)
reader = threading.Thread(
target=_stdout_reader,
args=(rtl_stdout, pcm_queue, stop_event),
)
reader.daemon = True
reader.start()
while not stop_event.is_set():
try:
data = pcm_queue.get(timeout=2.0)
except queue.Empty:
# No data from SDR — emit diagnostic heartbeat
now = time.monotonic()
if waiting_since is None:
waiting_since = now
if now - last_scope >= SCOPE_INTERVAL:
last_scope = now
with contextlib.suppress(queue.Full):
output_queue.put_nowait({
'type': 'scope',
'amplitudes': [],
'threshold': 0,
'tone_on': False,
'waiting': True,
'waiting_seconds': round(now - waiting_since, 1),
})
continue
if data is None: # EOF sentinel
break
waiting_since = None
events = decoder.process_block(data)
for event in events:
if event['type'] == 'scope':
# Throttle scope events to ~10 Hz
now = time.monotonic()
if now - last_scope >= SCOPE_INTERVAL:
last_scope = now
with contextlib.suppress(queue.Full):
output_queue.put_nowait(event)
else:
# Character and space events always go through
with contextlib.suppress(queue.Full):
output_queue.put_nowait(event)
except Exception as e:
logger.debug(f"Morse decoder thread error: {e}")
finally:
# Flush any pending symbol
for event in decoder.flush():
with contextlib.suppress(queue.Full):
output_queue.put_nowait(event)
# Notify frontend that the decoder has stopped (e.g. rtl_fm died)
with contextlib.suppress(queue.Full):
output_queue.put_nowait({'type': 'status', 'status': 'stopped'})