Add real-time WebSocket waterfall with I/Q capture and server-side FFT

Replace the batch rtl_power SSE pipeline with continuous I/Q streaming
via WebSocket for smooth ~25fps waterfall display. The server captures
raw I/Q samples (rtl_sdr/rx_sdr), computes Hann-windowed FFT, and
sends compact binary frames (1035 bytes vs ~15KB JSON, 93% reduction).
Client falls back to existing SSE path if WebSocket is unavailable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-08 12:37:50 +00:00
parent 7aae2944d4
commit 026337a350
12 changed files with 1107 additions and 8 deletions
+8
View File
@@ -869,6 +869,14 @@ def main() -> None:
except ImportError as e:
print(f"KiwiSDR audio proxy disabled: {e}")
# Initialize WebSocket for waterfall streaming
try:
from routes.waterfall_websocket import init_waterfall_websocket
init_waterfall_websocket(app)
print("WebSocket waterfall streaming enabled")
except ImportError as e:
print(f"WebSocket waterfall disabled: {e}")
print(f"Open http://localhost:{args.port} in your browser")
print()
print("Press Ctrl+C to stop")
+326
View File
@@ -0,0 +1,326 @@
"""WebSocket-based waterfall streaming with I/Q capture and server-side FFT."""
import json
import subprocess
import threading
import time
from flask import Flask
try:
from flask_sock import Sock
WEBSOCKET_AVAILABLE = True
except ImportError:
WEBSOCKET_AVAILABLE = False
Sock = None
from utils.logging import get_logger
from utils.process import safe_terminate, register_process, unregister_process
from utils.waterfall_fft import (
build_binary_frame,
compute_power_spectrum,
cu8_to_complex,
quantize_to_uint8,
)
from utils.sdr import SDRFactory, SDRType
from utils.sdr.base import SDRCapabilities, SDRDevice
logger = get_logger('intercept.waterfall_ws')
# Maximum bandwidth per SDR type (Hz)
MAX_BANDWIDTH = {
SDRType.RTL_SDR: 2400000,
SDRType.HACKRF: 20000000,
SDRType.LIME_SDR: 20000000,
SDRType.AIRSPY: 10000000,
SDRType.SDRPLAY: 2000000,
}
def _resolve_sdr_type(sdr_type_str: str) -> SDRType:
"""Convert client sdr_type string to SDRType enum."""
mapping = {
'rtlsdr': SDRType.RTL_SDR,
'rtl_sdr': SDRType.RTL_SDR,
'hackrf': SDRType.HACKRF,
'limesdr': SDRType.LIME_SDR,
'lime_sdr': SDRType.LIME_SDR,
'airspy': SDRType.AIRSPY,
'sdrplay': SDRType.SDRPLAY,
}
return mapping.get(sdr_type_str.lower(), SDRType.RTL_SDR)
def _build_dummy_device(device_index: int, sdr_type: SDRType) -> SDRDevice:
"""Build a minimal SDRDevice for command building."""
builder = SDRFactory.get_builder(sdr_type)
caps = builder.get_capabilities()
return SDRDevice(
sdr_type=sdr_type,
index=device_index,
name=f'{sdr_type.value}-{device_index}',
serial='N/A',
driver=sdr_type.value,
capabilities=caps,
)
def init_waterfall_websocket(app: Flask):
"""Initialize WebSocket waterfall streaming."""
if not WEBSOCKET_AVAILABLE:
logger.warning("flask-sock not installed, WebSocket waterfall disabled")
return
sock = Sock(app)
@sock.route('/ws/waterfall')
def waterfall_stream(ws):
"""WebSocket endpoint for real-time waterfall streaming."""
logger.info("WebSocket waterfall client connected")
# Import app module for device claiming
import app as app_module
iq_process = None
reader_thread = None
stop_event = threading.Event()
claimed_device = None
try:
while True:
try:
msg = ws.receive(timeout=0.1)
except TimeoutError:
if stop_event.is_set():
break
continue
except Exception as e:
if "closed" in str(e).lower():
break
if "timed out" not in str(e).lower():
logger.error(f"WebSocket receive error: {e}")
continue
if msg is None:
break
try:
data = json.loads(msg)
except (json.JSONDecodeError, TypeError):
continue
cmd = data.get('cmd')
if cmd == 'start':
# Stop any existing capture
stop_event.set()
if reader_thread and reader_thread.is_alive():
reader_thread.join(timeout=2)
if iq_process:
safe_terminate(iq_process)
unregister_process(iq_process)
iq_process = None
if claimed_device is not None:
app_module.release_sdr_device(claimed_device)
claimed_device = None
stop_event.clear()
# Parse config
center_freq = float(data.get('center_freq', 100.0))
span_mhz = float(data.get('span_mhz', 2.0))
gain = data.get('gain')
if gain is not None:
gain = float(gain)
device_index = int(data.get('device', 0))
sdr_type_str = data.get('sdr_type', 'rtlsdr')
fft_size = int(data.get('fft_size', 1024))
fps = int(data.get('fps', 25))
avg_count = int(data.get('avg_count', 4))
ppm = data.get('ppm')
if ppm is not None:
ppm = int(ppm)
bias_t = bool(data.get('bias_t', False))
# Clamp FFT size to valid powers of 2
fft_size = max(256, min(8192, fft_size))
# Resolve SDR type and bandwidth
sdr_type = _resolve_sdr_type(sdr_type_str)
max_bw = MAX_BANDWIDTH.get(sdr_type, 2400000)
span_hz = int(span_mhz * 1e6)
sample_rate = min(span_hz, max_bw)
# Compute effective frequency range
effective_span_mhz = sample_rate / 1e6
start_freq = center_freq - effective_span_mhz / 2
end_freq = center_freq + effective_span_mhz / 2
# Claim the device
claim_err = app_module.claim_sdr_device(device_index, 'waterfall')
if claim_err:
ws.send(json.dumps({
'status': 'error',
'message': claim_err,
'error_type': 'DEVICE_BUSY',
}))
continue
claimed_device = device_index
# Build I/Q capture command
try:
builder = SDRFactory.get_builder(sdr_type)
device = _build_dummy_device(device_index, sdr_type)
iq_cmd = builder.build_iq_capture_command(
device=device,
frequency_mhz=center_freq,
sample_rate=sample_rate,
gain=gain,
ppm=ppm,
bias_t=bias_t,
)
except NotImplementedError as e:
app_module.release_sdr_device(device_index)
claimed_device = None
ws.send(json.dumps({
'status': 'error',
'message': str(e),
}))
continue
# Spawn I/Q capture process
try:
logger.info(
f"Starting I/Q capture: {center_freq} MHz, "
f"span={effective_span_mhz:.1f} MHz, "
f"sr={sample_rate}, fft={fft_size}"
)
iq_process = subprocess.Popen(
iq_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=0,
)
register_process(iq_process)
# Brief check that process started
time.sleep(0.2)
if iq_process.poll() is not None:
raise RuntimeError("I/Q capture process exited immediately")
except Exception as e:
logger.error(f"Failed to start I/Q capture: {e}")
if iq_process:
safe_terminate(iq_process)
unregister_process(iq_process)
iq_process = None
app_module.release_sdr_device(device_index)
claimed_device = None
ws.send(json.dumps({
'status': 'error',
'message': f'Failed to start I/Q capture: {e}',
}))
continue
# Send started confirmation
ws.send(json.dumps({
'status': 'started',
'start_freq': start_freq,
'end_freq': end_freq,
'fft_size': fft_size,
'sample_rate': sample_rate,
}))
# Start reader thread
def fft_reader(
proc, ws_ref, stop_evt,
_fft_size, _avg_count, _fps,
_start_freq, _end_freq,
):
"""Read I/Q from subprocess, compute FFT, send binary frames."""
bytes_per_frame = _fft_size * _avg_count * 2
frame_interval = 1.0 / _fps
try:
while not stop_evt.is_set():
if proc.poll() is not None:
break
frame_start = time.monotonic()
# Read raw I/Q bytes
raw = b''
remaining = bytes_per_frame
while remaining > 0 and not stop_evt.is_set():
chunk = proc.stdout.read(min(remaining, 65536))
if not chunk:
break
raw += chunk
remaining -= len(chunk)
if len(raw) < _fft_size * 2:
break
# Process FFT pipeline
samples = cu8_to_complex(raw)
power_db = compute_power_spectrum(
samples,
fft_size=_fft_size,
avg_count=_avg_count,
)
quantized = quantize_to_uint8(power_db)
frame = build_binary_frame(
_start_freq, _end_freq, quantized,
)
try:
ws_ref.send(frame)
except Exception:
break
# Pace to target FPS
elapsed = time.monotonic() - frame_start
sleep_time = frame_interval - elapsed
if sleep_time > 0:
stop_evt.wait(sleep_time)
except Exception as e:
logger.debug(f"FFT reader stopped: {e}")
reader_thread = threading.Thread(
target=fft_reader,
args=(
iq_process, ws, stop_event,
fft_size, avg_count, fps,
start_freq, end_freq,
),
daemon=True,
)
reader_thread.start()
elif cmd == 'stop':
stop_event.set()
if reader_thread and reader_thread.is_alive():
reader_thread.join(timeout=2)
reader_thread = None
if iq_process:
safe_terminate(iq_process)
unregister_process(iq_process)
iq_process = None
if claimed_device is not None:
app_module.release_sdr_device(claimed_device)
claimed_device = None
stop_event.clear()
ws.send(json.dumps({'status': 'stopped'}))
except Exception as e:
logger.info(f"WebSocket waterfall closed: {e}")
finally:
# Cleanup
stop_event.set()
if reader_thread and reader_thread.is_alive():
reader_thread.join(timeout=2)
if iq_process:
safe_terminate(iq_process)
unregister_process(iq_process)
if claimed_device is not None:
app_module.release_sdr_device(claimed_device)
logger.info("WebSocket waterfall client disconnected")
+256 -2
View File
@@ -3095,6 +3095,10 @@ const WATERFALL_ZOOM_MIN_MHZ = 0.1;
const WATERFALL_ZOOM_MAX_MHZ = 500;
const WATERFALL_DEFAULT_SPAN_MHZ = 2.0;
// WebSocket waterfall state
let waterfallWebSocket = null;
let waterfallUseWebSocket = false;
function resizeCanvasToDisplaySize(canvas) {
if (!canvas) return false;
const dpr = window.devicePixelRatio || 1;
@@ -3525,11 +3529,199 @@ function drawSpectrumLine(bins, startFreq, endFreq, labelUnit) {
spectrumCtx.fill();
}
function connectWaterfallWebSocket(config) {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/waterfall`;
return new Promise((resolve, reject) => {
try {
const ws = new WebSocket(wsUrl);
ws.binaryType = 'arraybuffer';
const timeout = setTimeout(() => {
ws.close();
reject(new Error('WebSocket connection timeout'));
}, 5000);
ws.onopen = () => {
clearTimeout(timeout);
ws.send(JSON.stringify({ cmd: 'start', ...config }));
};
ws.onmessage = (event) => {
if (typeof event.data === 'string') {
const msg = JSON.parse(event.data);
if (msg.status === 'started') {
waterfallWebSocket = ws;
waterfallUseWebSocket = true;
if (typeof msg.start_freq === 'number') waterfallStartFreq = msg.start_freq;
if (typeof msg.end_freq === 'number') waterfallEndFreq = msg.end_freq;
const rangeLabel = document.getElementById('waterfallFreqRange');
if (rangeLabel) {
rangeLabel.textContent = `${waterfallStartFreq.toFixed(1)} - ${waterfallEndFreq.toFixed(1)} MHz`;
}
updateWaterfallZoomLabel(waterfallStartFreq, waterfallEndFreq);
resolve(ws);
} else if (msg.status === 'error') {
ws.close();
reject(new Error(msg.message || 'WebSocket waterfall error'));
} else if (msg.status === 'stopped') {
// Server confirmed stop
}
} else if (event.data instanceof ArrayBuffer) {
const now = Date.now();
if (now - lastWaterfallDraw < WATERFALL_MIN_INTERVAL_MS) return;
lastWaterfallDraw = now;
parseBinaryWaterfallFrame(event.data);
}
};
ws.onerror = () => {
clearTimeout(timeout);
reject(new Error('WebSocket connection failed'));
};
ws.onclose = () => {
if (waterfallUseWebSocket && isWaterfallRunning) {
waterfallWebSocket = null;
waterfallUseWebSocket = false;
isWaterfallRunning = false;
setWaterfallControlButtons(false);
if (typeof releaseDevice === 'function') {
releaseDevice('waterfall');
}
}
};
} catch (e) {
reject(e);
}
});
}
function parseBinaryWaterfallFrame(buffer) {
if (buffer.byteLength < 11) return;
const view = new DataView(buffer);
const msgType = view.getUint8(0);
if (msgType !== 0x01) return;
const startFreq = view.getFloat32(1, true);
const endFreq = view.getFloat32(5, true);
const binCount = view.getUint16(9, true);
if (buffer.byteLength < 11 + binCount) return;
const bins = new Uint8Array(buffer, 11, binCount);
waterfallStartFreq = startFreq;
waterfallEndFreq = endFreq;
const rangeLabel = document.getElementById('waterfallFreqRange');
if (rangeLabel) {
rangeLabel.textContent = `${startFreq.toFixed(1)} - ${endFreq.toFixed(1)} MHz`;
}
updateWaterfallZoomLabel(startFreq, endFreq);
drawWaterfallRowBinary(bins);
drawSpectrumLineBinary(bins, startFreq, endFreq);
}
function drawWaterfallRowBinary(bins) {
if (!waterfallCtx || !waterfallCanvas) return;
const w = waterfallCanvas.width;
const h = waterfallCanvas.height;
const rowHeight = waterfallRowImage ? waterfallRowImage.height : 1;
// Scroll existing content down
waterfallCtx.drawImage(waterfallCanvas, 0, 0, w, h - rowHeight, 0, rowHeight, w, h - rowHeight);
if (!waterfallRowImage || waterfallRowImage.width !== w || waterfallRowImage.height !== rowHeight) {
waterfallRowImage = waterfallCtx.createImageData(w, rowHeight);
}
const rowData = waterfallRowImage.data;
const palette = waterfallPalette || buildWaterfallPalette();
const binCount = bins.length;
for (let x = 0; x < w; x++) {
const pos = (x / (w - 1)) * (binCount - 1);
const i0 = Math.floor(pos);
const i1 = Math.min(binCount - 1, i0 + 1);
const t = pos - i0;
// Interpolate between bins (already uint8, 0-255)
const val = Math.round(bins[i0] * (1 - t) + bins[i1] * t);
const color = palette[Math.max(0, Math.min(255, val))] || [0, 0, 0];
for (let y = 0; y < rowHeight; y++) {
const offset = (y * w + x) * 4;
rowData[offset] = color[0];
rowData[offset + 1] = color[1];
rowData[offset + 2] = color[2];
rowData[offset + 3] = 255;
}
}
waterfallCtx.putImageData(waterfallRowImage, 0, 0);
}
function drawSpectrumLineBinary(bins, startFreq, endFreq) {
if (!spectrumCtx || !spectrumCanvas) return;
const w = spectrumCanvas.width;
const h = spectrumCanvas.height;
spectrumCtx.clearRect(0, 0, w, h);
// Background
spectrumCtx.fillStyle = 'rgba(0, 0, 0, 0.8)';
spectrumCtx.fillRect(0, 0, w, h);
// Grid lines
spectrumCtx.strokeStyle = 'rgba(0, 200, 255, 0.1)';
spectrumCtx.lineWidth = 0.5;
for (let i = 0; i < 5; i++) {
const y = (h / 5) * i;
spectrumCtx.beginPath();
spectrumCtx.moveTo(0, y);
spectrumCtx.lineTo(w, y);
spectrumCtx.stroke();
}
// Frequency labels
const dpr = window.devicePixelRatio || 1;
spectrumCtx.fillStyle = 'rgba(0, 200, 255, 0.5)';
spectrumCtx.font = `${9 * dpr}px monospace`;
const freqRange = endFreq - startFreq;
for (let i = 0; i <= 4; i++) {
const freq = startFreq + (freqRange / 4) * i;
const x = (w / 4) * i;
spectrumCtx.fillText(freq.toFixed(1), x + 2, h - 2);
}
if (bins.length === 0) return;
// Draw spectrum line — bins are pre-quantized 0-255
spectrumCtx.strokeStyle = 'rgba(0, 255, 255, 0.9)';
spectrumCtx.lineWidth = 1.5;
spectrumCtx.beginPath();
for (let i = 0; i < bins.length; i++) {
const x = (i / (bins.length - 1)) * w;
const normalized = bins[i] / 255;
const y = h - 12 - normalized * (h - 16);
if (i === 0) spectrumCtx.moveTo(x, y);
else spectrumCtx.lineTo(x, y);
}
spectrumCtx.stroke();
// Fill under line
const lastX = w;
const lastY = h - 12 - (bins[bins.length - 1] / 255) * (h - 16);
spectrumCtx.lineTo(lastX, h);
spectrumCtx.lineTo(0, h);
spectrumCtx.closePath();
spectrumCtx.fillStyle = 'rgba(0, 255, 255, 0.08)';
spectrumCtx.fill();
}
async function startWaterfall(options = {}) {
const { silent = false, resume = false } = options;
const startFreq = parseFloat(document.getElementById('waterfallStartFreq')?.value || 88);
const endFreq = parseFloat(document.getElementById('waterfallEndFreq')?.value || 108);
const binSize = parseInt(document.getElementById('waterfallBinSize')?.value || 10000);
const fftSize = parseInt(document.getElementById('waterfallFftSize')?.value || document.getElementById('waterfallBinSize')?.value || 1024);
const gain = parseInt(document.getElementById('waterfallGain')?.value || 40);
const device = typeof getSelectedDevice === 'function' ? getSelectedDevice() : 0;
initWaterfallCanvas();
@@ -3565,10 +3757,51 @@ async function startWaterfall(options = {}) {
}
setWaterfallMode('rf');
const spanMhz = Math.max(0.1, waterfallEndFreq - waterfallStartFreq);
// Try WebSocket path first (I/Q + server-side FFT)
const centerFreq = (startFreq + endFreq) / 2;
const spanMhz = Math.max(0.1, endFreq - startFreq);
try {
const wsConfig = {
center_freq: centerFreq,
span_mhz: spanMhz,
gain: gain,
device: device,
sdr_type: (typeof getSelectedSdrType === 'function') ? getSelectedSdrType() : 'rtlsdr',
fft_size: fftSize,
fps: 25,
avg_count: 4,
};
await connectWaterfallWebSocket(wsConfig);
isWaterfallRunning = true;
setWaterfallControlButtons(true);
const waterfallPanel = document.getElementById('waterfallPanel');
if (waterfallPanel) waterfallPanel.style.display = 'block';
lastWaterfallDraw = 0;
initWaterfallCanvas();
if (typeof reserveDevice === 'function') {
reserveDevice(parseInt(device), 'waterfall');
}
if (resume || resumeRfWaterfallAfterListening) {
resumeRfWaterfallAfterListening = false;
}
if (waterfallResumeTimer) {
clearTimeout(waterfallResumeTimer);
waterfallResumeTimer = null;
}
console.log('[WATERFALL] WebSocket connected');
return { started: true };
} catch (wsErr) {
console.log('[WATERFALL] WebSocket unavailable, falling back to SSE:', wsErr.message);
}
// Fallback: SSE / rtl_power path
const segments = Math.max(1, Math.ceil(spanMhz / 2.4));
const targetSweepSeconds = 0.8;
const interval = Math.max(0.1, Math.min(0.3, targetSweepSeconds / segments));
const binSize = fftSize;
try {
const response = await fetch('/listening/waterfall/start', {
@@ -3635,6 +3868,27 @@ async function stopWaterfall() {
return;
}
// WebSocket path
if (waterfallUseWebSocket && waterfallWebSocket) {
try {
if (waterfallWebSocket.readyState === WebSocket.OPEN) {
waterfallWebSocket.send(JSON.stringify({ cmd: 'stop' }));
}
waterfallWebSocket.close();
} catch (e) {
console.error('[WATERFALL] WebSocket stop error:', e);
}
waterfallWebSocket = null;
waterfallUseWebSocket = false;
isWaterfallRunning = false;
setWaterfallControlButtons(false);
if (typeof releaseDevice === 'function') {
releaseDevice('waterfall');
}
return;
}
// SSE fallback path
try {
await fetch('/listening/waterfall/stop', { method: 'POST' });
isWaterfallRunning = false;
+6 -6
View File
@@ -525,12 +525,12 @@
</div>
</div>
<div class="form-group" style="margin-bottom: 6px;">
<label style="font-size: 10px;">Bin Size</label>
<select id="waterfallBinSize" style="width: 100%; padding: 5px; background: var(--bg-secondary); border: 1px solid var(--border-color); color: var(--text-primary); border-radius: 4px; font-size: 11px;">
<option value="5000">5 kHz</option>
<option value="10000" selected>10 kHz</option>
<option value="25000">25 kHz</option>
<option value="100000">100 kHz</option>
<label style="font-size: 10px;">FFT Size</label>
<select id="waterfallFftSize" style="width: 100%; padding: 5px; background: var(--bg-secondary); border: 1px solid var(--border-color); color: var(--text-primary); border-radius: 4px; font-size: 11px;">
<option value="512">512</option>
<option value="1024" selected>1024</option>
<option value="2048">2048</option>
<option value="4096">4096</option>
</select>
</div>
<div class="form-group" style="margin-bottom: 8px;">
+168
View File
@@ -0,0 +1,168 @@
"""Tests for the waterfall FFT pipeline."""
import struct
import numpy as np
import pytest
from utils.waterfall_fft import (
build_binary_frame,
compute_power_spectrum,
cu8_to_complex,
quantize_to_uint8,
)
class TestCu8ToComplex:
"""Tests for cu8_to_complex conversion."""
def test_zero_maps_to_negative_one(self):
# I=0, Q=0 -> approximately -1 - 1j
result = cu8_to_complex(bytes([0, 0]))
assert result[0].real == pytest.approx(-1.0, abs=0.01)
assert result[0].imag == pytest.approx(-1.0, abs=0.01)
def test_255_maps_to_positive_one(self):
# I=255, Q=255 -> approximately +1 + 1j
result = cu8_to_complex(bytes([255, 255]))
assert result[0].real == pytest.approx(1.0, abs=0.01)
assert result[0].imag == pytest.approx(1.0, abs=0.01)
def test_128_maps_to_near_zero(self):
# I=128, Q=128 -> approximately 0 + 0j
result = cu8_to_complex(bytes([128, 128]))
assert abs(result[0].real) < 0.01
assert abs(result[0].imag) < 0.01
def test_output_length(self):
raw = bytes(range(256)) * 4 # 1024 bytes -> 512 complex samples
result = cu8_to_complex(raw)
assert len(result) == 512
def test_output_dtype(self):
result = cu8_to_complex(bytes([100, 200, 50, 150]))
assert result.dtype == np.complex64 or np.issubdtype(result.dtype, np.complexfloating)
class TestComputePowerSpectrum:
"""Tests for compute_power_spectrum."""
def test_output_length_matches_fft_size(self):
samples = np.zeros(4096, dtype=np.complex64)
result = compute_power_spectrum(samples, fft_size=1024, avg_count=4)
assert len(result) == 1024
def test_output_dtype(self):
samples = np.zeros(4096, dtype=np.complex64)
result = compute_power_spectrum(samples, fft_size=1024, avg_count=4)
assert result.dtype == np.float32
def test_pure_tone_peak_at_correct_bin(self):
fft_size = 1024
avg_count = 4
n = fft_size * avg_count
# Generate a pure tone at bin 256 (1/4 of sample rate)
t = np.arange(n, dtype=np.float32)
freq_bin = 256
tone = np.exp(2j * np.pi * freq_bin / fft_size * t).astype(np.complex64)
result = compute_power_spectrum(tone, fft_size=fft_size, avg_count=avg_count)
# After fftshift, bin 256 maps to index 256 + 512 = 768
peak_idx = np.argmax(result)
expected_idx = fft_size // 2 + freq_bin
assert peak_idx == expected_idx
def test_insufficient_samples_returns_default(self):
# Not enough samples for even one segment
samples = np.zeros(100, dtype=np.complex64)
result = compute_power_spectrum(samples, fft_size=1024, avg_count=4)
assert len(result) == 1024
assert np.all(result == -100.0)
def test_partial_avg_count(self):
# Only enough for 2 of 4 requested averages
fft_size = 1024
samples = np.random.randn(2048).astype(np.float32).view(np.complex64)
result = compute_power_spectrum(samples, fft_size=fft_size, avg_count=4)
assert len(result) == fft_size
# Should still return valid dB values (not -100 default)
assert np.any(result != -100.0)
class TestQuantizeToUint8:
"""Tests for quantize_to_uint8."""
def test_db_min_maps_to_zero(self):
power = np.array([-90.0], dtype=np.float32)
result = quantize_to_uint8(power, db_min=-90, db_max=-20)
assert result[0] == 0
def test_db_max_maps_to_255(self):
power = np.array([-20.0], dtype=np.float32)
result = quantize_to_uint8(power, db_min=-90, db_max=-20)
assert result[0] == 255
def test_below_min_clamped_to_zero(self):
power = np.array([-120.0], dtype=np.float32)
result = quantize_to_uint8(power, db_min=-90, db_max=-20)
assert result[0] == 0
def test_above_max_clamped_to_255(self):
power = np.array([0.0], dtype=np.float32)
result = quantize_to_uint8(power, db_min=-90, db_max=-20)
assert result[0] == 255
def test_midpoint(self):
# Midpoint between -90 and -20 is -55 -> ~127-128
power = np.array([-55.0], dtype=np.float32)
result = quantize_to_uint8(power, db_min=-90, db_max=-20)
assert 125 <= result[0] <= 130
def test_output_length(self):
power = np.random.randn(1024).astype(np.float32) * 30 - 60
result = quantize_to_uint8(power)
assert len(result) == 1024
class TestBuildBinaryFrame:
"""Tests for build_binary_frame."""
def test_header_values(self):
bins = bytes([128] * 1024)
frame = build_binary_frame(100.0, 102.0, bins)
msg_type = frame[0]
start_freq, end_freq = struct.unpack_from('<ff', frame, 1)
bin_count = struct.unpack_from('<H', frame, 9)[0]
assert msg_type == 0x01
assert start_freq == pytest.approx(100.0, abs=0.01)
assert end_freq == pytest.approx(102.0, abs=0.01)
assert bin_count == 1024
def test_total_length(self):
bin_count = 1024
bins = bytes([0] * bin_count)
frame = build_binary_frame(88.0, 108.0, bins)
assert len(frame) == 11 + bin_count
def test_bins_in_payload(self):
bins = bytes(range(256))
frame = build_binary_frame(0.0, 1.0, bins)
payload = frame[11:]
assert payload == bins
def test_round_trip(self):
start = 433.0
end = 435.0
bins = bytes([i % 256 for i in range(2048)])
frame = build_binary_frame(start, end, bins)
# Parse it back
msg_type = frame[0]
parsed_start, parsed_end = struct.unpack_from('<ff', frame, 1)
parsed_count = struct.unpack_from('<H', frame, 9)[0]
parsed_bins = frame[11:]
assert msg_type == 0x01
assert parsed_start == pytest.approx(start, abs=0.01)
assert parsed_end == pytest.approx(end, abs=0.01)
assert parsed_count == 2048
assert parsed_bins == bins
+37
View File
@@ -185,6 +185,43 @@ class AirspyCommandBuilder(CommandBuilder):
return cmd
def build_iq_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int = 2048000,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False,
output_format: str = 'cu8',
) -> list[str]:
"""
Build rx_sdr command for raw I/Q capture with Airspy.
Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display.
"""
device_str = self._build_device_string(device)
freq_hz = int(frequency_mhz * 1e6)
cmd = [
'rx_sdr',
'-d', device_str,
'-f', str(freq_hz),
'-s', str(sample_rate),
'-F', 'CU8',
]
if gain is not None and gain > 0:
cmd.extend(['-g', self._format_gain(gain)])
if bias_t:
cmd.append('-T')
# Output to stdout
cmd.append('-')
return cmd
def get_capabilities(self) -> SDRCapabilities:
"""Return Airspy capabilities."""
return self.CAPABILITIES
+35
View File
@@ -186,6 +186,41 @@ class CommandBuilder(ABC):
"""Return hardware capabilities for this SDR type."""
pass
def build_iq_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int = 2048000,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False,
output_format: str = 'cu8',
) -> list[str]:
"""
Build raw I/Q capture command for streaming samples to stdout.
Used for real-time waterfall/spectrum display. Output is unsigned
8-bit I/Q pairs (cu8) written continuously to stdout.
Args:
device: The SDR device to use
frequency_mhz: Center frequency in MHz
sample_rate: Sample rate in Hz (default 2048000)
gain: Gain in dB (None for auto)
ppm: PPM frequency correction
bias_t: Enable bias-T power (for active antennas)
output_format: Output sample format (default 'cu8')
Returns:
Command as list of strings for subprocess
Raises:
NotImplementedError: If the SDR type does not support I/Q capture.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support raw I/Q capture"
)
@classmethod
@abstractmethod
def get_sdr_type(cls) -> SDRType:
+38
View File
@@ -185,6 +185,44 @@ class HackRFCommandBuilder(CommandBuilder):
return cmd
def build_iq_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int = 2048000,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False,
output_format: str = 'cu8',
) -> list[str]:
"""
Build rx_sdr command for raw I/Q capture with HackRF.
Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display.
"""
device_str = self._build_device_string(device)
freq_hz = int(frequency_mhz * 1e6)
cmd = [
'rx_sdr',
'-d', device_str,
'-f', str(freq_hz),
'-s', str(sample_rate),
'-F', 'CU8',
]
if gain is not None and gain > 0:
lna, vga = self._split_gain(gain)
cmd.extend(['-g', f'LNA={lna},VGA={vga}'])
if bias_t:
cmd.append('-T')
# Output to stdout
cmd.append('-')
return cmd
def get_capabilities(self) -> SDRCapabilities:
"""Return HackRF capabilities."""
return self.CAPABILITIES
+35
View File
@@ -162,6 +162,41 @@ class LimeSDRCommandBuilder(CommandBuilder):
return cmd
def build_iq_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int = 2048000,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False,
output_format: str = 'cu8',
) -> list[str]:
"""
Build rx_sdr command for raw I/Q capture with LimeSDR.
Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display.
Note: LimeSDR does not support bias-T, parameter is ignored.
"""
device_str = self._build_device_string(device)
freq_hz = int(frequency_mhz * 1e6)
cmd = [
'rx_sdr',
'-d', device_str,
'-f', str(freq_hz),
'-s', str(sample_rate),
'-F', 'CU8',
]
if gain is not None and gain > 0:
cmd.extend(['-g', f'LNAH={int(gain)}'])
# Output to stdout
cmd.append('-')
return cmd
def get_capabilities(self) -> SDRCapabilities:
"""Return LimeSDR capabilities."""
return self.CAPABILITIES
+39
View File
@@ -231,6 +231,45 @@ class RTLSDRCommandBuilder(CommandBuilder):
return cmd
def build_iq_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int = 2048000,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False,
output_format: str = 'cu8',
) -> list[str]:
"""
Build rtl_sdr command for raw I/Q capture.
Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display.
"""
rtl_sdr_path = get_tool_path('rtl_sdr') or 'rtl_sdr'
freq_hz = int(frequency_mhz * 1e6)
cmd = [
rtl_sdr_path,
'-d', self._get_device_arg(device),
'-f', str(freq_hz),
'-s', str(sample_rate),
]
if gain is not None and gain > 0:
cmd.extend(['-g', str(gain)])
if ppm is not None and ppm != 0:
cmd.extend(['-p', str(ppm)])
if bias_t:
cmd.append('-T')
# Output to stdout
cmd.append('-')
return cmd
def get_capabilities(self) -> SDRCapabilities:
"""Return RTL-SDR capabilities."""
return self.CAPABILITIES
+37
View File
@@ -163,6 +163,43 @@ class SDRPlayCommandBuilder(CommandBuilder):
return cmd
def build_iq_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int = 2048000,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False,
output_format: str = 'cu8',
) -> list[str]:
"""
Build rx_sdr command for raw I/Q capture with SDRPlay.
Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display.
"""
device_str = self._build_device_string(device)
freq_hz = int(frequency_mhz * 1e6)
cmd = [
'rx_sdr',
'-d', device_str,
'-f', str(freq_hz),
'-s', str(sample_rate),
'-F', 'CU8',
]
if gain is not None and gain > 0:
cmd.extend(['-g', f'IFGR={int(gain)}'])
if bias_t:
cmd.append('-T')
# Output to stdout
cmd.append('-')
return cmd
def get_capabilities(self) -> SDRCapabilities:
"""Return SDRPlay capabilities."""
return self.CAPABILITIES
+122
View File
@@ -0,0 +1,122 @@
"""FFT pipeline for real-time waterfall display.
Converts raw I/Q samples from SDR hardware into quantized power spectrum
frames suitable for binary WebSocket transmission.
"""
from __future__ import annotations
import struct
import numpy as np
def cu8_to_complex(raw: bytes) -> np.ndarray:
"""Convert unsigned 8-bit I/Q bytes to complex64.
RTL-SDR (and rx_sdr with -F cu8) outputs interleaved unsigned 8-bit
I/Q pairs where 128 is the zero point.
Args:
raw: Raw bytes, length must be even (I/Q pairs).
Returns:
Complex64 array of length len(raw) // 2.
"""
iq = np.frombuffer(raw, dtype=np.uint8).astype(np.float32)
# Normalize: 0 -> -1.0, 128 -> ~0.0, 255 -> +1.0
iq = (iq - 127.5) / 127.5
return iq[0::2] + 1j * iq[1::2]
def compute_power_spectrum(
samples: np.ndarray,
fft_size: int = 1024,
avg_count: int = 4,
) -> np.ndarray:
"""Compute averaged power spectrum in dBm.
Applies a Hann window, computes FFT, converts to power (dB),
and averages over multiple segments.
Args:
samples: Complex64 array, length >= fft_size * avg_count.
fft_size: Number of FFT bins.
avg_count: Number of segments to average.
Returns:
Float32 array of length fft_size with power in dB (fftshift'd).
"""
window = np.hanning(fft_size).astype(np.float32)
accum = np.zeros(fft_size, dtype=np.float32)
actual_avg = 0
for i in range(avg_count):
offset = i * fft_size
if offset + fft_size > len(samples):
break
segment = samples[offset : offset + fft_size] * window
spectrum = np.fft.fft(segment)
power = np.real(spectrum * np.conj(spectrum))
# Avoid log10(0)
power = np.maximum(power, 1e-20)
accum += 10.0 * np.log10(power)
actual_avg += 1
if actual_avg == 0:
return np.full(fft_size, -100.0, dtype=np.float32)
accum /= actual_avg
return np.fft.fftshift(accum).astype(np.float32)
def quantize_to_uint8(
power_db: np.ndarray,
db_min: float = -90.0,
db_max: float = -20.0,
) -> bytes:
"""Clamp and scale dB values to 0-255.
Args:
power_db: Float32 array of power values in dB.
db_min: Value mapped to 0.
db_max: Value mapped to 255.
Returns:
Bytes of length len(power_db), each in [0, 255].
"""
db_range = db_max - db_min
if db_range <= 0:
db_range = 1.0
scaled = (power_db - db_min) / db_range * 255.0
clamped = np.clip(scaled, 0, 255).astype(np.uint8)
return clamped.tobytes()
def build_binary_frame(
start_freq: float,
end_freq: float,
quantized_bins: bytes,
) -> bytes:
"""Pack a binary waterfall frame for WebSocket transmission.
Wire format (little-endian):
[uint8 msg_type=0x01]
[float32 start_freq]
[float32 end_freq]
[uint16 bin_count]
[uint8[] bins]
Total size = 11 + bin_count bytes.
Args:
start_freq: Start frequency in MHz.
end_freq: End frequency in MHz.
quantized_bins: Pre-quantized uint8 bin data.
Returns:
Binary frame bytes.
"""
bin_count = len(quantized_bins)
header = struct.pack('<BffH', 0x01, start_freq, end_freq, bin_count)
return header + quantized_bins