diff --git a/app.py b/app.py index 01d8d1c..470f909 100644 --- a/app.py +++ b/app.py @@ -869,6 +869,14 @@ def main() -> None: except ImportError as e: print(f"KiwiSDR audio proxy disabled: {e}") + # Initialize WebSocket for waterfall streaming + try: + from routes.waterfall_websocket import init_waterfall_websocket + init_waterfall_websocket(app) + print("WebSocket waterfall streaming enabled") + except ImportError as e: + print(f"WebSocket waterfall disabled: {e}") + print(f"Open http://localhost:{args.port} in your browser") print() print("Press Ctrl+C to stop") diff --git a/routes/waterfall_websocket.py b/routes/waterfall_websocket.py new file mode 100644 index 0000000..5cdd29e --- /dev/null +++ b/routes/waterfall_websocket.py @@ -0,0 +1,326 @@ +"""WebSocket-based waterfall streaming with I/Q capture and server-side FFT.""" + +import json +import subprocess +import threading +import time + +from flask import Flask + +try: + from flask_sock import Sock + WEBSOCKET_AVAILABLE = True +except ImportError: + WEBSOCKET_AVAILABLE = False + Sock = None + +from utils.logging import get_logger +from utils.process import safe_terminate, register_process, unregister_process +from utils.waterfall_fft import ( + build_binary_frame, + compute_power_spectrum, + cu8_to_complex, + quantize_to_uint8, +) +from utils.sdr import SDRFactory, SDRType +from utils.sdr.base import SDRCapabilities, SDRDevice + +logger = get_logger('intercept.waterfall_ws') + +# Maximum bandwidth per SDR type (Hz) +MAX_BANDWIDTH = { + SDRType.RTL_SDR: 2400000, + SDRType.HACKRF: 20000000, + SDRType.LIME_SDR: 20000000, + SDRType.AIRSPY: 10000000, + SDRType.SDRPLAY: 2000000, +} + + +def _resolve_sdr_type(sdr_type_str: str) -> SDRType: + """Convert client sdr_type string to SDRType enum.""" + mapping = { + 'rtlsdr': SDRType.RTL_SDR, + 'rtl_sdr': SDRType.RTL_SDR, + 'hackrf': SDRType.HACKRF, + 'limesdr': SDRType.LIME_SDR, + 'lime_sdr': SDRType.LIME_SDR, + 'airspy': SDRType.AIRSPY, + 'sdrplay': SDRType.SDRPLAY, + } + return mapping.get(sdr_type_str.lower(), SDRType.RTL_SDR) + + +def _build_dummy_device(device_index: int, sdr_type: SDRType) -> SDRDevice: + """Build a minimal SDRDevice for command building.""" + builder = SDRFactory.get_builder(sdr_type) + caps = builder.get_capabilities() + return SDRDevice( + sdr_type=sdr_type, + index=device_index, + name=f'{sdr_type.value}-{device_index}', + serial='N/A', + driver=sdr_type.value, + capabilities=caps, + ) + + +def init_waterfall_websocket(app: Flask): + """Initialize WebSocket waterfall streaming.""" + if not WEBSOCKET_AVAILABLE: + logger.warning("flask-sock not installed, WebSocket waterfall disabled") + return + + sock = Sock(app) + + @sock.route('/ws/waterfall') + def waterfall_stream(ws): + """WebSocket endpoint for real-time waterfall streaming.""" + logger.info("WebSocket waterfall client connected") + + # Import app module for device claiming + import app as app_module + + iq_process = None + reader_thread = None + stop_event = threading.Event() + claimed_device = None + + try: + while True: + try: + msg = ws.receive(timeout=0.1) + except TimeoutError: + if stop_event.is_set(): + break + continue + except Exception as e: + if "closed" in str(e).lower(): + break + if "timed out" not in str(e).lower(): + logger.error(f"WebSocket receive error: {e}") + continue + + if msg is None: + break + + try: + data = json.loads(msg) + except (json.JSONDecodeError, TypeError): + continue + + cmd = data.get('cmd') + + if cmd == 'start': + # Stop any existing capture + stop_event.set() + if reader_thread and reader_thread.is_alive(): + reader_thread.join(timeout=2) + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + iq_process = None + if claimed_device is not None: + app_module.release_sdr_device(claimed_device) + claimed_device = None + stop_event.clear() + + # Parse config + center_freq = float(data.get('center_freq', 100.0)) + span_mhz = float(data.get('span_mhz', 2.0)) + gain = data.get('gain') + if gain is not None: + gain = float(gain) + device_index = int(data.get('device', 0)) + sdr_type_str = data.get('sdr_type', 'rtlsdr') + fft_size = int(data.get('fft_size', 1024)) + fps = int(data.get('fps', 25)) + avg_count = int(data.get('avg_count', 4)) + ppm = data.get('ppm') + if ppm is not None: + ppm = int(ppm) + bias_t = bool(data.get('bias_t', False)) + + # Clamp FFT size to valid powers of 2 + fft_size = max(256, min(8192, fft_size)) + + # Resolve SDR type and bandwidth + sdr_type = _resolve_sdr_type(sdr_type_str) + max_bw = MAX_BANDWIDTH.get(sdr_type, 2400000) + span_hz = int(span_mhz * 1e6) + sample_rate = min(span_hz, max_bw) + + # Compute effective frequency range + effective_span_mhz = sample_rate / 1e6 + start_freq = center_freq - effective_span_mhz / 2 + end_freq = center_freq + effective_span_mhz / 2 + + # Claim the device + claim_err = app_module.claim_sdr_device(device_index, 'waterfall') + if claim_err: + ws.send(json.dumps({ + 'status': 'error', + 'message': claim_err, + 'error_type': 'DEVICE_BUSY', + })) + continue + claimed_device = device_index + + # Build I/Q capture command + try: + builder = SDRFactory.get_builder(sdr_type) + device = _build_dummy_device(device_index, sdr_type) + iq_cmd = builder.build_iq_capture_command( + device=device, + frequency_mhz=center_freq, + sample_rate=sample_rate, + gain=gain, + ppm=ppm, + bias_t=bias_t, + ) + except NotImplementedError as e: + app_module.release_sdr_device(device_index) + claimed_device = None + ws.send(json.dumps({ + 'status': 'error', + 'message': str(e), + })) + continue + + # Spawn I/Q capture process + try: + logger.info( + f"Starting I/Q capture: {center_freq} MHz, " + f"span={effective_span_mhz:.1f} MHz, " + f"sr={sample_rate}, fft={fft_size}" + ) + iq_process = subprocess.Popen( + iq_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=0, + ) + register_process(iq_process) + + # Brief check that process started + time.sleep(0.2) + if iq_process.poll() is not None: + raise RuntimeError("I/Q capture process exited immediately") + except Exception as e: + logger.error(f"Failed to start I/Q capture: {e}") + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + iq_process = None + app_module.release_sdr_device(device_index) + claimed_device = None + ws.send(json.dumps({ + 'status': 'error', + 'message': f'Failed to start I/Q capture: {e}', + })) + continue + + # Send started confirmation + ws.send(json.dumps({ + 'status': 'started', + 'start_freq': start_freq, + 'end_freq': end_freq, + 'fft_size': fft_size, + 'sample_rate': sample_rate, + })) + + # Start reader thread + def fft_reader( + proc, ws_ref, stop_evt, + _fft_size, _avg_count, _fps, + _start_freq, _end_freq, + ): + """Read I/Q from subprocess, compute FFT, send binary frames.""" + bytes_per_frame = _fft_size * _avg_count * 2 + frame_interval = 1.0 / _fps + + try: + while not stop_evt.is_set(): + if proc.poll() is not None: + break + + frame_start = time.monotonic() + + # Read raw I/Q bytes + raw = b'' + remaining = bytes_per_frame + while remaining > 0 and not stop_evt.is_set(): + chunk = proc.stdout.read(min(remaining, 65536)) + if not chunk: + break + raw += chunk + remaining -= len(chunk) + + if len(raw) < _fft_size * 2: + break + + # Process FFT pipeline + samples = cu8_to_complex(raw) + power_db = compute_power_spectrum( + samples, + fft_size=_fft_size, + avg_count=_avg_count, + ) + quantized = quantize_to_uint8(power_db) + frame = build_binary_frame( + _start_freq, _end_freq, quantized, + ) + + try: + ws_ref.send(frame) + except Exception: + break + + # Pace to target FPS + elapsed = time.monotonic() - frame_start + sleep_time = frame_interval - elapsed + if sleep_time > 0: + stop_evt.wait(sleep_time) + + except Exception as e: + logger.debug(f"FFT reader stopped: {e}") + + reader_thread = threading.Thread( + target=fft_reader, + args=( + iq_process, ws, stop_event, + fft_size, avg_count, fps, + start_freq, end_freq, + ), + daemon=True, + ) + reader_thread.start() + + elif cmd == 'stop': + stop_event.set() + if reader_thread and reader_thread.is_alive(): + reader_thread.join(timeout=2) + reader_thread = None + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + iq_process = None + if claimed_device is not None: + app_module.release_sdr_device(claimed_device) + claimed_device = None + stop_event.clear() + ws.send(json.dumps({'status': 'stopped'})) + + except Exception as e: + logger.info(f"WebSocket waterfall closed: {e}") + finally: + # Cleanup + stop_event.set() + if reader_thread and reader_thread.is_alive(): + reader_thread.join(timeout=2) + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + if claimed_device is not None: + app_module.release_sdr_device(claimed_device) + logger.info("WebSocket waterfall client disconnected") diff --git a/static/js/modes/listening-post.js b/static/js/modes/listening-post.js index e3bd908..e2bca2f 100644 --- a/static/js/modes/listening-post.js +++ b/static/js/modes/listening-post.js @@ -3095,6 +3095,10 @@ const WATERFALL_ZOOM_MIN_MHZ = 0.1; const WATERFALL_ZOOM_MAX_MHZ = 500; const WATERFALL_DEFAULT_SPAN_MHZ = 2.0; +// WebSocket waterfall state +let waterfallWebSocket = null; +let waterfallUseWebSocket = false; + function resizeCanvasToDisplaySize(canvas) { if (!canvas) return false; const dpr = window.devicePixelRatio || 1; @@ -3525,11 +3529,199 @@ function drawSpectrumLine(bins, startFreq, endFreq, labelUnit) { spectrumCtx.fill(); } +function connectWaterfallWebSocket(config) { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `${protocol}//${window.location.host}/ws/waterfall`; + + return new Promise((resolve, reject) => { + try { + const ws = new WebSocket(wsUrl); + ws.binaryType = 'arraybuffer'; + + const timeout = setTimeout(() => { + ws.close(); + reject(new Error('WebSocket connection timeout')); + }, 5000); + + ws.onopen = () => { + clearTimeout(timeout); + ws.send(JSON.stringify({ cmd: 'start', ...config })); + }; + + ws.onmessage = (event) => { + if (typeof event.data === 'string') { + const msg = JSON.parse(event.data); + if (msg.status === 'started') { + waterfallWebSocket = ws; + waterfallUseWebSocket = true; + if (typeof msg.start_freq === 'number') waterfallStartFreq = msg.start_freq; + if (typeof msg.end_freq === 'number') waterfallEndFreq = msg.end_freq; + const rangeLabel = document.getElementById('waterfallFreqRange'); + if (rangeLabel) { + rangeLabel.textContent = `${waterfallStartFreq.toFixed(1)} - ${waterfallEndFreq.toFixed(1)} MHz`; + } + updateWaterfallZoomLabel(waterfallStartFreq, waterfallEndFreq); + resolve(ws); + } else if (msg.status === 'error') { + ws.close(); + reject(new Error(msg.message || 'WebSocket waterfall error')); + } else if (msg.status === 'stopped') { + // Server confirmed stop + } + } else if (event.data instanceof ArrayBuffer) { + const now = Date.now(); + if (now - lastWaterfallDraw < WATERFALL_MIN_INTERVAL_MS) return; + lastWaterfallDraw = now; + parseBinaryWaterfallFrame(event.data); + } + }; + + ws.onerror = () => { + clearTimeout(timeout); + reject(new Error('WebSocket connection failed')); + }; + + ws.onclose = () => { + if (waterfallUseWebSocket && isWaterfallRunning) { + waterfallWebSocket = null; + waterfallUseWebSocket = false; + isWaterfallRunning = false; + setWaterfallControlButtons(false); + if (typeof releaseDevice === 'function') { + releaseDevice('waterfall'); + } + } + }; + } catch (e) { + reject(e); + } + }); +} + +function parseBinaryWaterfallFrame(buffer) { + if (buffer.byteLength < 11) return; + const view = new DataView(buffer); + const msgType = view.getUint8(0); + if (msgType !== 0x01) return; + + const startFreq = view.getFloat32(1, true); + const endFreq = view.getFloat32(5, true); + const binCount = view.getUint16(9, true); + + if (buffer.byteLength < 11 + binCount) return; + + const bins = new Uint8Array(buffer, 11, binCount); + + waterfallStartFreq = startFreq; + waterfallEndFreq = endFreq; + const rangeLabel = document.getElementById('waterfallFreqRange'); + if (rangeLabel) { + rangeLabel.textContent = `${startFreq.toFixed(1)} - ${endFreq.toFixed(1)} MHz`; + } + updateWaterfallZoomLabel(startFreq, endFreq); + + drawWaterfallRowBinary(bins); + drawSpectrumLineBinary(bins, startFreq, endFreq); +} + +function drawWaterfallRowBinary(bins) { + if (!waterfallCtx || !waterfallCanvas) return; + const w = waterfallCanvas.width; + const h = waterfallCanvas.height; + const rowHeight = waterfallRowImage ? waterfallRowImage.height : 1; + + // Scroll existing content down + waterfallCtx.drawImage(waterfallCanvas, 0, 0, w, h - rowHeight, 0, rowHeight, w, h - rowHeight); + + if (!waterfallRowImage || waterfallRowImage.width !== w || waterfallRowImage.height !== rowHeight) { + waterfallRowImage = waterfallCtx.createImageData(w, rowHeight); + } + const rowData = waterfallRowImage.data; + const palette = waterfallPalette || buildWaterfallPalette(); + const binCount = bins.length; + + for (let x = 0; x < w; x++) { + const pos = (x / (w - 1)) * (binCount - 1); + const i0 = Math.floor(pos); + const i1 = Math.min(binCount - 1, i0 + 1); + const t = pos - i0; + // Interpolate between bins (already uint8, 0-255) + const val = Math.round(bins[i0] * (1 - t) + bins[i1] * t); + const color = palette[Math.max(0, Math.min(255, val))] || [0, 0, 0]; + for (let y = 0; y < rowHeight; y++) { + const offset = (y * w + x) * 4; + rowData[offset] = color[0]; + rowData[offset + 1] = color[1]; + rowData[offset + 2] = color[2]; + rowData[offset + 3] = 255; + } + } + waterfallCtx.putImageData(waterfallRowImage, 0, 0); +} + +function drawSpectrumLineBinary(bins, startFreq, endFreq) { + if (!spectrumCtx || !spectrumCanvas) return; + const w = spectrumCanvas.width; + const h = spectrumCanvas.height; + + spectrumCtx.clearRect(0, 0, w, h); + + // Background + spectrumCtx.fillStyle = 'rgba(0, 0, 0, 0.8)'; + spectrumCtx.fillRect(0, 0, w, h); + + // Grid lines + spectrumCtx.strokeStyle = 'rgba(0, 200, 255, 0.1)'; + spectrumCtx.lineWidth = 0.5; + for (let i = 0; i < 5; i++) { + const y = (h / 5) * i; + spectrumCtx.beginPath(); + spectrumCtx.moveTo(0, y); + spectrumCtx.lineTo(w, y); + spectrumCtx.stroke(); + } + + // Frequency labels + const dpr = window.devicePixelRatio || 1; + spectrumCtx.fillStyle = 'rgba(0, 200, 255, 0.5)'; + spectrumCtx.font = `${9 * dpr}px monospace`; + const freqRange = endFreq - startFreq; + for (let i = 0; i <= 4; i++) { + const freq = startFreq + (freqRange / 4) * i; + const x = (w / 4) * i; + spectrumCtx.fillText(freq.toFixed(1), x + 2, h - 2); + } + + if (bins.length === 0) return; + + // Draw spectrum line — bins are pre-quantized 0-255 + spectrumCtx.strokeStyle = 'rgba(0, 255, 255, 0.9)'; + spectrumCtx.lineWidth = 1.5; + spectrumCtx.beginPath(); + for (let i = 0; i < bins.length; i++) { + const x = (i / (bins.length - 1)) * w; + const normalized = bins[i] / 255; + const y = h - 12 - normalized * (h - 16); + if (i === 0) spectrumCtx.moveTo(x, y); + else spectrumCtx.lineTo(x, y); + } + spectrumCtx.stroke(); + + // Fill under line + const lastX = w; + const lastY = h - 12 - (bins[bins.length - 1] / 255) * (h - 16); + spectrumCtx.lineTo(lastX, h); + spectrumCtx.lineTo(0, h); + spectrumCtx.closePath(); + spectrumCtx.fillStyle = 'rgba(0, 255, 255, 0.08)'; + spectrumCtx.fill(); +} + async function startWaterfall(options = {}) { const { silent = false, resume = false } = options; const startFreq = parseFloat(document.getElementById('waterfallStartFreq')?.value || 88); const endFreq = parseFloat(document.getElementById('waterfallEndFreq')?.value || 108); - const binSize = parseInt(document.getElementById('waterfallBinSize')?.value || 10000); + const fftSize = parseInt(document.getElementById('waterfallFftSize')?.value || document.getElementById('waterfallBinSize')?.value || 1024); const gain = parseInt(document.getElementById('waterfallGain')?.value || 40); const device = typeof getSelectedDevice === 'function' ? getSelectedDevice() : 0; initWaterfallCanvas(); @@ -3565,10 +3757,51 @@ async function startWaterfall(options = {}) { } setWaterfallMode('rf'); - const spanMhz = Math.max(0.1, waterfallEndFreq - waterfallStartFreq); + + // Try WebSocket path first (I/Q + server-side FFT) + const centerFreq = (startFreq + endFreq) / 2; + const spanMhz = Math.max(0.1, endFreq - startFreq); + + try { + const wsConfig = { + center_freq: centerFreq, + span_mhz: spanMhz, + gain: gain, + device: device, + sdr_type: (typeof getSelectedSdrType === 'function') ? getSelectedSdrType() : 'rtlsdr', + fft_size: fftSize, + fps: 25, + avg_count: 4, + }; + await connectWaterfallWebSocket(wsConfig); + + isWaterfallRunning = true; + setWaterfallControlButtons(true); + const waterfallPanel = document.getElementById('waterfallPanel'); + if (waterfallPanel) waterfallPanel.style.display = 'block'; + lastWaterfallDraw = 0; + initWaterfallCanvas(); + if (typeof reserveDevice === 'function') { + reserveDevice(parseInt(device), 'waterfall'); + } + if (resume || resumeRfWaterfallAfterListening) { + resumeRfWaterfallAfterListening = false; + } + if (waterfallResumeTimer) { + clearTimeout(waterfallResumeTimer); + waterfallResumeTimer = null; + } + console.log('[WATERFALL] WebSocket connected'); + return { started: true }; + } catch (wsErr) { + console.log('[WATERFALL] WebSocket unavailable, falling back to SSE:', wsErr.message); + } + + // Fallback: SSE / rtl_power path const segments = Math.max(1, Math.ceil(spanMhz / 2.4)); const targetSweepSeconds = 0.8; const interval = Math.max(0.1, Math.min(0.3, targetSweepSeconds / segments)); + const binSize = fftSize; try { const response = await fetch('/listening/waterfall/start', { @@ -3635,6 +3868,27 @@ async function stopWaterfall() { return; } + // WebSocket path + if (waterfallUseWebSocket && waterfallWebSocket) { + try { + if (waterfallWebSocket.readyState === WebSocket.OPEN) { + waterfallWebSocket.send(JSON.stringify({ cmd: 'stop' })); + } + waterfallWebSocket.close(); + } catch (e) { + console.error('[WATERFALL] WebSocket stop error:', e); + } + waterfallWebSocket = null; + waterfallUseWebSocket = false; + isWaterfallRunning = false; + setWaterfallControlButtons(false); + if (typeof releaseDevice === 'function') { + releaseDevice('waterfall'); + } + return; + } + + // SSE fallback path try { await fetch('/listening/waterfall/stop', { method: 'POST' }); isWaterfallRunning = false; diff --git a/templates/index.html b/templates/index.html index 75bed94..526df0c 100644 --- a/templates/index.html +++ b/templates/index.html @@ -525,12 +525,12 @@
- - + + + +
diff --git a/tests/test_waterfall_fft.py b/tests/test_waterfall_fft.py new file mode 100644 index 0000000..722569e --- /dev/null +++ b/tests/test_waterfall_fft.py @@ -0,0 +1,168 @@ +"""Tests for the waterfall FFT pipeline.""" + +import struct + +import numpy as np +import pytest + +from utils.waterfall_fft import ( + build_binary_frame, + compute_power_spectrum, + cu8_to_complex, + quantize_to_uint8, +) + + +class TestCu8ToComplex: + """Tests for cu8_to_complex conversion.""" + + def test_zero_maps_to_negative_one(self): + # I=0, Q=0 -> approximately -1 - 1j + result = cu8_to_complex(bytes([0, 0])) + assert result[0].real == pytest.approx(-1.0, abs=0.01) + assert result[0].imag == pytest.approx(-1.0, abs=0.01) + + def test_255_maps_to_positive_one(self): + # I=255, Q=255 -> approximately +1 + 1j + result = cu8_to_complex(bytes([255, 255])) + assert result[0].real == pytest.approx(1.0, abs=0.01) + assert result[0].imag == pytest.approx(1.0, abs=0.01) + + def test_128_maps_to_near_zero(self): + # I=128, Q=128 -> approximately 0 + 0j + result = cu8_to_complex(bytes([128, 128])) + assert abs(result[0].real) < 0.01 + assert abs(result[0].imag) < 0.01 + + def test_output_length(self): + raw = bytes(range(256)) * 4 # 1024 bytes -> 512 complex samples + result = cu8_to_complex(raw) + assert len(result) == 512 + + def test_output_dtype(self): + result = cu8_to_complex(bytes([100, 200, 50, 150])) + assert result.dtype == np.complex64 or np.issubdtype(result.dtype, np.complexfloating) + + +class TestComputePowerSpectrum: + """Tests for compute_power_spectrum.""" + + def test_output_length_matches_fft_size(self): + samples = np.zeros(4096, dtype=np.complex64) + result = compute_power_spectrum(samples, fft_size=1024, avg_count=4) + assert len(result) == 1024 + + def test_output_dtype(self): + samples = np.zeros(4096, dtype=np.complex64) + result = compute_power_spectrum(samples, fft_size=1024, avg_count=4) + assert result.dtype == np.float32 + + def test_pure_tone_peak_at_correct_bin(self): + fft_size = 1024 + avg_count = 4 + n = fft_size * avg_count + # Generate a pure tone at bin 256 (1/4 of sample rate) + t = np.arange(n, dtype=np.float32) + freq_bin = 256 + tone = np.exp(2j * np.pi * freq_bin / fft_size * t).astype(np.complex64) + result = compute_power_spectrum(tone, fft_size=fft_size, avg_count=avg_count) + # After fftshift, bin 256 maps to index 256 + 512 = 768 + peak_idx = np.argmax(result) + expected_idx = fft_size // 2 + freq_bin + assert peak_idx == expected_idx + + def test_insufficient_samples_returns_default(self): + # Not enough samples for even one segment + samples = np.zeros(100, dtype=np.complex64) + result = compute_power_spectrum(samples, fft_size=1024, avg_count=4) + assert len(result) == 1024 + assert np.all(result == -100.0) + + def test_partial_avg_count(self): + # Only enough for 2 of 4 requested averages + fft_size = 1024 + samples = np.random.randn(2048).astype(np.float32).view(np.complex64) + result = compute_power_spectrum(samples, fft_size=fft_size, avg_count=4) + assert len(result) == fft_size + # Should still return valid dB values (not -100 default) + assert np.any(result != -100.0) + + +class TestQuantizeToUint8: + """Tests for quantize_to_uint8.""" + + def test_db_min_maps_to_zero(self): + power = np.array([-90.0], dtype=np.float32) + result = quantize_to_uint8(power, db_min=-90, db_max=-20) + assert result[0] == 0 + + def test_db_max_maps_to_255(self): + power = np.array([-20.0], dtype=np.float32) + result = quantize_to_uint8(power, db_min=-90, db_max=-20) + assert result[0] == 255 + + def test_below_min_clamped_to_zero(self): + power = np.array([-120.0], dtype=np.float32) + result = quantize_to_uint8(power, db_min=-90, db_max=-20) + assert result[0] == 0 + + def test_above_max_clamped_to_255(self): + power = np.array([0.0], dtype=np.float32) + result = quantize_to_uint8(power, db_min=-90, db_max=-20) + assert result[0] == 255 + + def test_midpoint(self): + # Midpoint between -90 and -20 is -55 -> ~127-128 + power = np.array([-55.0], dtype=np.float32) + result = quantize_to_uint8(power, db_min=-90, db_max=-20) + assert 125 <= result[0] <= 130 + + def test_output_length(self): + power = np.random.randn(1024).astype(np.float32) * 30 - 60 + result = quantize_to_uint8(power) + assert len(result) == 1024 + + +class TestBuildBinaryFrame: + """Tests for build_binary_frame.""" + + def test_header_values(self): + bins = bytes([128] * 1024) + frame = build_binary_frame(100.0, 102.0, bins) + msg_type = frame[0] + start_freq, end_freq = struct.unpack_from(' list[str]: + """ + Build rx_sdr command for raw I/Q capture with Airspy. + + Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display. + """ + device_str = self._build_device_string(device) + freq_hz = int(frequency_mhz * 1e6) + + cmd = [ + 'rx_sdr', + '-d', device_str, + '-f', str(freq_hz), + '-s', str(sample_rate), + '-F', 'CU8', + ] + + if gain is not None and gain > 0: + cmd.extend(['-g', self._format_gain(gain)]) + + if bias_t: + cmd.append('-T') + + # Output to stdout + cmd.append('-') + + return cmd + def get_capabilities(self) -> SDRCapabilities: """Return Airspy capabilities.""" return self.CAPABILITIES diff --git a/utils/sdr/base.py b/utils/sdr/base.py index 4dc79be..e7f84ba 100644 --- a/utils/sdr/base.py +++ b/utils/sdr/base.py @@ -186,6 +186,41 @@ class CommandBuilder(ABC): """Return hardware capabilities for this SDR type.""" pass + def build_iq_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int = 2048000, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False, + output_format: str = 'cu8', + ) -> list[str]: + """ + Build raw I/Q capture command for streaming samples to stdout. + + Used for real-time waterfall/spectrum display. Output is unsigned + 8-bit I/Q pairs (cu8) written continuously to stdout. + + Args: + device: The SDR device to use + frequency_mhz: Center frequency in MHz + sample_rate: Sample rate in Hz (default 2048000) + gain: Gain in dB (None for auto) + ppm: PPM frequency correction + bias_t: Enable bias-T power (for active antennas) + output_format: Output sample format (default 'cu8') + + Returns: + Command as list of strings for subprocess + + Raises: + NotImplementedError: If the SDR type does not support I/Q capture. + """ + raise NotImplementedError( + f"{self.__class__.__name__} does not support raw I/Q capture" + ) + @classmethod @abstractmethod def get_sdr_type(cls) -> SDRType: diff --git a/utils/sdr/hackrf.py b/utils/sdr/hackrf.py index ea3a24e..63a5fd6 100644 --- a/utils/sdr/hackrf.py +++ b/utils/sdr/hackrf.py @@ -185,6 +185,44 @@ class HackRFCommandBuilder(CommandBuilder): return cmd + def build_iq_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int = 2048000, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False, + output_format: str = 'cu8', + ) -> list[str]: + """ + Build rx_sdr command for raw I/Q capture with HackRF. + + Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display. + """ + device_str = self._build_device_string(device) + freq_hz = int(frequency_mhz * 1e6) + + cmd = [ + 'rx_sdr', + '-d', device_str, + '-f', str(freq_hz), + '-s', str(sample_rate), + '-F', 'CU8', + ] + + if gain is not None and gain > 0: + lna, vga = self._split_gain(gain) + cmd.extend(['-g', f'LNA={lna},VGA={vga}']) + + if bias_t: + cmd.append('-T') + + # Output to stdout + cmd.append('-') + + return cmd + def get_capabilities(self) -> SDRCapabilities: """Return HackRF capabilities.""" return self.CAPABILITIES diff --git a/utils/sdr/limesdr.py b/utils/sdr/limesdr.py index ad9a9d1..3dcd8d2 100644 --- a/utils/sdr/limesdr.py +++ b/utils/sdr/limesdr.py @@ -162,6 +162,41 @@ class LimeSDRCommandBuilder(CommandBuilder): return cmd + def build_iq_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int = 2048000, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False, + output_format: str = 'cu8', + ) -> list[str]: + """ + Build rx_sdr command for raw I/Q capture with LimeSDR. + + Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display. + Note: LimeSDR does not support bias-T, parameter is ignored. + """ + device_str = self._build_device_string(device) + freq_hz = int(frequency_mhz * 1e6) + + cmd = [ + 'rx_sdr', + '-d', device_str, + '-f', str(freq_hz), + '-s', str(sample_rate), + '-F', 'CU8', + ] + + if gain is not None and gain > 0: + cmd.extend(['-g', f'LNAH={int(gain)}']) + + # Output to stdout + cmd.append('-') + + return cmd + def get_capabilities(self) -> SDRCapabilities: """Return LimeSDR capabilities.""" return self.CAPABILITIES diff --git a/utils/sdr/rtlsdr.py b/utils/sdr/rtlsdr.py index 6d2b8d8..25b4495 100644 --- a/utils/sdr/rtlsdr.py +++ b/utils/sdr/rtlsdr.py @@ -231,6 +231,45 @@ class RTLSDRCommandBuilder(CommandBuilder): return cmd + def build_iq_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int = 2048000, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False, + output_format: str = 'cu8', + ) -> list[str]: + """ + Build rtl_sdr command for raw I/Q capture. + + Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display. + """ + rtl_sdr_path = get_tool_path('rtl_sdr') or 'rtl_sdr' + freq_hz = int(frequency_mhz * 1e6) + + cmd = [ + rtl_sdr_path, + '-d', self._get_device_arg(device), + '-f', str(freq_hz), + '-s', str(sample_rate), + ] + + if gain is not None and gain > 0: + cmd.extend(['-g', str(gain)]) + + if ppm is not None and ppm != 0: + cmd.extend(['-p', str(ppm)]) + + if bias_t: + cmd.append('-T') + + # Output to stdout + cmd.append('-') + + return cmd + def get_capabilities(self) -> SDRCapabilities: """Return RTL-SDR capabilities.""" return self.CAPABILITIES diff --git a/utils/sdr/sdrplay.py b/utils/sdr/sdrplay.py index 240e286..79df27c 100644 --- a/utils/sdr/sdrplay.py +++ b/utils/sdr/sdrplay.py @@ -163,6 +163,43 @@ class SDRPlayCommandBuilder(CommandBuilder): return cmd + def build_iq_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int = 2048000, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False, + output_format: str = 'cu8', + ) -> list[str]: + """ + Build rx_sdr command for raw I/Q capture with SDRPlay. + + Outputs unsigned 8-bit I/Q pairs to stdout for waterfall display. + """ + device_str = self._build_device_string(device) + freq_hz = int(frequency_mhz * 1e6) + + cmd = [ + 'rx_sdr', + '-d', device_str, + '-f', str(freq_hz), + '-s', str(sample_rate), + '-F', 'CU8', + ] + + if gain is not None and gain > 0: + cmd.extend(['-g', f'IFGR={int(gain)}']) + + if bias_t: + cmd.append('-T') + + # Output to stdout + cmd.append('-') + + return cmd + def get_capabilities(self) -> SDRCapabilities: """Return SDRPlay capabilities.""" return self.CAPABILITIES diff --git a/utils/waterfall_fft.py b/utils/waterfall_fft.py new file mode 100644 index 0000000..bf688c7 --- /dev/null +++ b/utils/waterfall_fft.py @@ -0,0 +1,122 @@ +"""FFT pipeline for real-time waterfall display. + +Converts raw I/Q samples from SDR hardware into quantized power spectrum +frames suitable for binary WebSocket transmission. +""" + +from __future__ import annotations + +import struct + +import numpy as np + + +def cu8_to_complex(raw: bytes) -> np.ndarray: + """Convert unsigned 8-bit I/Q bytes to complex64. + + RTL-SDR (and rx_sdr with -F cu8) outputs interleaved unsigned 8-bit + I/Q pairs where 128 is the zero point. + + Args: + raw: Raw bytes, length must be even (I/Q pairs). + + Returns: + Complex64 array of length len(raw) // 2. + """ + iq = np.frombuffer(raw, dtype=np.uint8).astype(np.float32) + # Normalize: 0 -> -1.0, 128 -> ~0.0, 255 -> +1.0 + iq = (iq - 127.5) / 127.5 + return iq[0::2] + 1j * iq[1::2] + + +def compute_power_spectrum( + samples: np.ndarray, + fft_size: int = 1024, + avg_count: int = 4, +) -> np.ndarray: + """Compute averaged power spectrum in dBm. + + Applies a Hann window, computes FFT, converts to power (dB), + and averages over multiple segments. + + Args: + samples: Complex64 array, length >= fft_size * avg_count. + fft_size: Number of FFT bins. + avg_count: Number of segments to average. + + Returns: + Float32 array of length fft_size with power in dB (fftshift'd). + """ + window = np.hanning(fft_size).astype(np.float32) + accum = np.zeros(fft_size, dtype=np.float32) + actual_avg = 0 + + for i in range(avg_count): + offset = i * fft_size + if offset + fft_size > len(samples): + break + segment = samples[offset : offset + fft_size] * window + spectrum = np.fft.fft(segment) + power = np.real(spectrum * np.conj(spectrum)) + # Avoid log10(0) + power = np.maximum(power, 1e-20) + accum += 10.0 * np.log10(power) + actual_avg += 1 + + if actual_avg == 0: + return np.full(fft_size, -100.0, dtype=np.float32) + + accum /= actual_avg + return np.fft.fftshift(accum).astype(np.float32) + + +def quantize_to_uint8( + power_db: np.ndarray, + db_min: float = -90.0, + db_max: float = -20.0, +) -> bytes: + """Clamp and scale dB values to 0-255. + + Args: + power_db: Float32 array of power values in dB. + db_min: Value mapped to 0. + db_max: Value mapped to 255. + + Returns: + Bytes of length len(power_db), each in [0, 255]. + """ + db_range = db_max - db_min + if db_range <= 0: + db_range = 1.0 + scaled = (power_db - db_min) / db_range * 255.0 + clamped = np.clip(scaled, 0, 255).astype(np.uint8) + return clamped.tobytes() + + +def build_binary_frame( + start_freq: float, + end_freq: float, + quantized_bins: bytes, +) -> bytes: + """Pack a binary waterfall frame for WebSocket transmission. + + Wire format (little-endian): + [uint8 msg_type=0x01] + [float32 start_freq] + [float32 end_freq] + [uint16 bin_count] + [uint8[] bins] + + Total size = 11 + bin_count bytes. + + Args: + start_freq: Start frequency in MHz. + end_freq: End frequency in MHz. + quantized_bins: Pre-quantized uint8 bin data. + + Returns: + Binary frame bytes. + """ + bin_count = len(quantized_bins) + header = struct.pack('