mirror of
https://github.com/smittix/intercept.git
synced 2026-05-29 20:39:26 -07:00
Use non-blocking pipe reads and raw-stream telemetry for Morse
This commit is contained in:
@@ -544,6 +544,7 @@ def start_morse() -> Response:
|
||||
stop_event = threading.Event()
|
||||
control_queue = queue.Queue(maxsize=16)
|
||||
pcm_ready_event = threading.Event()
|
||||
stream_ready_event = threading.Event()
|
||||
attempt_stderr_lines: list[str] = []
|
||||
|
||||
def monitor_stderr(
|
||||
@@ -598,6 +599,7 @@ def start_morse() -> Response:
|
||||
'decoder_config': runtime_config,
|
||||
'control_queue': control_queue,
|
||||
'pcm_ready_event': pcm_ready_event,
|
||||
'stream_ready_event': stream_ready_event,
|
||||
},
|
||||
daemon=True,
|
||||
name='morse-decoder',
|
||||
@@ -617,6 +619,7 @@ def start_morse() -> Response:
|
||||
'decoder_config': runtime_config,
|
||||
'control_queue': control_queue,
|
||||
'pcm_ready_event': pcm_ready_event,
|
||||
'stream_ready_event': stream_ready_event,
|
||||
'strip_text_chunks': False,
|
||||
},
|
||||
daemon=True,
|
||||
@@ -642,6 +645,8 @@ def start_morse() -> Response:
|
||||
startup_error = 'No PCM samples received within startup timeout'
|
||||
if attempt_stderr_lines:
|
||||
startup_error = f'{startup_error}; stderr: {attempt_stderr_lines[-1]}'
|
||||
if stream_ready_event.is_set():
|
||||
startup_error = f'{startup_error}; stream=alive'
|
||||
|
||||
is_last_attempt = attempt_index == len(command_attempts)
|
||||
if (
|
||||
|
||||
@@ -11,7 +11,9 @@ from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import math
|
||||
import os
|
||||
import queue
|
||||
import select
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
@@ -763,6 +765,7 @@ def morse_decoder_thread(
|
||||
decoder_config: dict[str, Any] | None = None,
|
||||
control_queue: queue.Queue | None = None,
|
||||
pcm_ready_event: threading.Event | None = None,
|
||||
stream_ready_event: threading.Event | None = None,
|
||||
strip_text_chunks: bool = False,
|
||||
) -> None:
|
||||
"""Decode Morse from live PCM stream and push events to *output_queue*."""
|
||||
@@ -800,16 +803,26 @@ def morse_decoder_thread(
|
||||
first_pcm_logged = False
|
||||
reader_done = threading.Event()
|
||||
reader_thread: threading.Thread | None = None
|
||||
first_raw_logged = False
|
||||
|
||||
raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96)
|
||||
|
||||
try:
|
||||
def _reader_loop() -> None:
|
||||
"""Blocking PCM reader isolated from decode/control loop."""
|
||||
nonlocal first_raw_logged
|
||||
try:
|
||||
fd = None
|
||||
with contextlib.suppress(Exception):
|
||||
fd = rtl_stdout.fileno()
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
if hasattr(rtl_stdout, 'read1'):
|
||||
if fd is not None:
|
||||
ready, _, _ = select.select([fd], [], [], 0.20)
|
||||
if not ready:
|
||||
continue
|
||||
data = os.read(fd, CHUNK)
|
||||
elif hasattr(rtl_stdout, 'read1'):
|
||||
data = rtl_stdout.read1(CHUNK)
|
||||
else:
|
||||
data = rtl_stdout.read(CHUNK)
|
||||
@@ -827,6 +840,16 @@ def morse_decoder_thread(
|
||||
if not data:
|
||||
break
|
||||
|
||||
if not first_raw_logged:
|
||||
first_raw_logged = True
|
||||
if stream_ready_event is not None:
|
||||
stream_ready_event.set()
|
||||
with contextlib.suppress(queue.Full):
|
||||
output_queue.put_nowait({
|
||||
'type': 'info',
|
||||
'text': f'[pcm] first raw chunk: {len(data)} bytes',
|
||||
})
|
||||
|
||||
if strip_text_chunks and _is_probably_rtl_log_text(data):
|
||||
try:
|
||||
text = data.decode('utf-8', errors='replace')
|
||||
@@ -1026,6 +1049,7 @@ def morse_iq_decoder_thread(
|
||||
decoder_config: dict[str, Any] | None = None,
|
||||
control_queue: queue.Queue | None = None,
|
||||
pcm_ready_event: threading.Event | None = None,
|
||||
stream_ready_event: threading.Event | None = None,
|
||||
) -> None:
|
||||
"""Decode Morse from raw IQ (cu8) by in-process USB demodulation."""
|
||||
import logging
|
||||
@@ -1062,15 +1086,25 @@ def morse_iq_decoder_thread(
|
||||
first_pcm_logged = False
|
||||
reader_done = threading.Event()
|
||||
reader_thread: threading.Thread | None = None
|
||||
first_raw_logged = False
|
||||
|
||||
raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96)
|
||||
|
||||
try:
|
||||
def _reader_loop() -> None:
|
||||
nonlocal first_raw_logged
|
||||
try:
|
||||
fd = None
|
||||
with contextlib.suppress(Exception):
|
||||
fd = iq_stdout.fileno()
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
if hasattr(iq_stdout, 'read1'):
|
||||
if fd is not None:
|
||||
ready, _, _ = select.select([fd], [], [], 0.20)
|
||||
if not ready:
|
||||
continue
|
||||
data = os.read(fd, CHUNK)
|
||||
elif hasattr(iq_stdout, 'read1'):
|
||||
data = iq_stdout.read1(CHUNK)
|
||||
else:
|
||||
data = iq_stdout.read(CHUNK)
|
||||
@@ -1087,6 +1121,16 @@ def morse_iq_decoder_thread(
|
||||
if not data:
|
||||
break
|
||||
|
||||
if not first_raw_logged:
|
||||
first_raw_logged = True
|
||||
if stream_ready_event is not None:
|
||||
stream_ready_event.set()
|
||||
with contextlib.suppress(queue.Full):
|
||||
output_queue.put_nowait({
|
||||
'type': 'info',
|
||||
'text': f'[iq] first raw chunk: {len(data)} bytes',
|
||||
})
|
||||
|
||||
try:
|
||||
raw_queue.put(data, timeout=0.2)
|
||||
except queue.Full:
|
||||
|
||||
Reference in New Issue
Block a user