Add real-time signal scope to 433MHz sensor mode

Enable -M level on rtl_433 to include RSSI/SNR in decoded JSON, extract
signal levels and push scope events to the SSE stream. Renders a green-
themed canvas oscilloscope showing signal strength pulses on packet decode
with amber SNR indicator and decay between packets.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-08 00:00:01 +00:00
parent 154dc898ff
commit 92e5e7c6da
2 changed files with 219 additions and 13 deletions

View File

@@ -18,8 +18,8 @@ from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import format_sse
from utils.event_pipeline import process_event
from utils.sse import format_sse
from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType
@@ -45,6 +45,21 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
data['type'] = 'sensor'
app_module.sensor_queue.put(data)
# Push scope event when signal level data is present
rssi = data.get('rssi')
snr = data.get('snr')
noise = data.get('noise')
if rssi is not None or snr is not None:
try:
app_module.sensor_queue.put_nowait({
'type': 'scope',
'rssi': rssi if rssi is not None else 0,
'snr': snr if snr is not None else 0,
'noise': noise if noise is not None else 0,
})
except queue.Full:
pass
# Log if enabled
if app_module.logging_enabled:
try:
@@ -158,6 +173,9 @@ def start_sensor() -> Response:
full_cmd = ' '.join(cmd)
logger.info(f"Running: {full_cmd}")
# Add signal level metadata so the frontend scope can display RSSI/SNR
cmd.extend(['-M', 'level'])
try:
app_module.sensor_process = subprocess.Popen(
cmd,
@@ -232,13 +250,13 @@ def stream_sensor() -> Response:
while True:
try:
msg = app_module.sensor_queue.get(timeout=1)
last_keepalive = time.time()
try:
process_event('sensor', msg, msg.get('type'))
except Exception:
pass
yield format_sse(msg)
msg = app_module.sensor_queue.get(timeout=1)
last_keepalive = time.time()
try:
process_event('sensor', msg, msg.get('type'))
except Exception:
pass
yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval: