diff --git a/routes/pager.py b/routes/pager.py
index 4ee5425..3253a6c 100644
--- a/routes/pager.py
+++ b/routes/pager.py
@@ -2,12 +2,14 @@
from __future__ import annotations
+import math
import os
import pathlib
import re
import pty
import queue
import select
+import struct
import subprocess
import threading
import time
@@ -22,8 +24,8 @@ from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
-from utils.sse import format_sse
-from utils.event_pipeline import process_event
+from utils.sse import format_sse
+from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType, SDRValidationError
from utils.dependencies import get_tool_path
@@ -106,6 +108,62 @@ def log_message(msg: dict[str, Any]) -> None:
logger.error(f"Failed to log message: {e}")
+def audio_relay_thread(
+ rtl_stdout,
+ multimon_stdin,
+ output_queue: queue.Queue,
+ stop_event: threading.Event,
+) -> None:
+ """Relay audio from rtl_fm to multimon-ng while computing signal levels.
+
+ Reads raw 16-bit LE PCM from *rtl_stdout*, writes every chunk straight
+ through to *multimon_stdin*, and every ~100 ms pushes an RMS / peak scope
+ event onto *output_queue*.
+ """
+ CHUNK = 4096 # bytes – 2048 samples at 16-bit mono
+ INTERVAL = 0.1 # seconds between scope updates
+ last_scope = time.monotonic()
+
+ try:
+ while not stop_event.is_set():
+ data = rtl_stdout.read(CHUNK)
+ if not data:
+ break
+
+ # Forward audio untouched
+ try:
+ multimon_stdin.write(data)
+ multimon_stdin.flush()
+ except (BrokenPipeError, OSError):
+ break
+
+ # Compute scope levels every ~100 ms
+ now = time.monotonic()
+ if now - last_scope >= INTERVAL:
+ last_scope = now
+ try:
+ n_samples = len(data) // 2
+ if n_samples == 0:
+ continue
+ samples = struct.unpack(f'<{n_samples}h', data[:n_samples * 2])
+ peak = max(abs(s) for s in samples)
+ rms = int(math.sqrt(sum(s * s for s in samples) / n_samples))
+ output_queue.put_nowait({
+ 'type': 'scope',
+ 'rms': rms,
+ 'peak': peak,
+ })
+ except (struct.error, ValueError, queue.Full):
+ pass
+ except Exception as e:
+ logger.debug(f"Audio relay error: {e}")
+ finally:
+ try:
+ multimon_stdin.close()
+ except OSError:
+ pass
+
+
def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
"""Stream decoder output to queue using PTY for unbuffered output."""
try:
@@ -152,6 +210,11 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
os.close(master_fd)
except OSError:
pass
+ # Signal relay thread to stop
+ with app_module.process_lock:
+ stop_relay = getattr(app_module.current_process, '_stop_relay', None)
+ if stop_relay:
+ stop_relay.set()
# Cleanup companion rtl_fm process and decoder
with app_module.process_lock:
rtl_proc = getattr(app_module.current_process, '_rtl_process', None)
@@ -319,7 +382,7 @@ def start_decoding() -> Response:
multimon_process = subprocess.Popen(
multimon_cmd,
- stdin=rtl_process.stdout,
+ stdin=subprocess.PIPE,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
@@ -327,11 +390,22 @@ def start_decoding() -> Response:
register_process(multimon_process)
os.close(slave_fd)
- rtl_process.stdout.close()
+
+ # Spawn audio relay thread between rtl_fm and multimon-ng
+ stop_relay = threading.Event()
+ relay = threading.Thread(
+ target=audio_relay_thread,
+ args=(rtl_process.stdout, multimon_process.stdin,
+ app_module.output_queue, stop_relay),
+ )
+ relay.daemon = True
+ relay.start()
app_module.current_process = multimon_process
app_module.current_process._rtl_process = rtl_process
app_module.current_process._master_fd = master_fd
+ app_module.current_process._stop_relay = stop_relay
+ app_module.current_process._relay_thread = relay
# Start output thread with PTY master fd
thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process))
@@ -380,6 +454,10 @@ def stop_decoding() -> Response:
with app_module.process_lock:
if app_module.current_process:
+ # Signal audio relay thread to stop
+ if hasattr(app_module.current_process, '_stop_relay'):
+ app_module.current_process._stop_relay.set()
+
# Kill rtl_fm process first
if hasattr(app_module.current_process, '_rtl_process'):
try:
@@ -469,14 +547,14 @@ def stream() -> Response:
keepalive_interval = 30.0 # Send keepalive every 30 seconds instead of 1 second
while True:
- try:
- msg = app_module.output_queue.get(timeout=1)
- last_keepalive = time.time()
- try:
- process_event('pager', msg, msg.get('type'))
- except Exception:
- pass
- yield format_sse(msg)
+ try:
+ msg = app_module.output_queue.get(timeout=1)
+ last_keepalive = time.time()
+ try:
+ process_event('pager', msg, msg.get('type'))
+ except Exception:
+ pass
+ yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval:
diff --git a/templates/index.html b/templates/index.html
index 25131ed..e0bab5d 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -2204,6 +2204,21 @@
+
+
+