diff --git a/app.py b/app.py index cf1934a..71386e4 100644 --- a/app.py +++ b/app.py @@ -232,6 +232,13 @@ cleanup_manager.register(ais_vessels) cleanup_manager.register(dsc_messages) cleanup_manager.register(deauth_alerts) +# ============================================ +# WATERFALL SOURCE TRACKING +# ============================================ +# Tracks whether waterfall data is being produced by a decoder's IQ pipeline +# None = no active source, 'rtl_power' = standalone, 'pager'/'sensor' = decoder-driven +waterfall_source: str | None = None + # ============================================ # SDR DEVICE REGISTRY # ============================================ @@ -659,7 +666,7 @@ def kill_all() -> Response: """Kill all decoder, WiFi, and Bluetooth processes.""" global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process - global dmr_process, dmr_rtl_process + global dmr_process, dmr_rtl_process, waterfall_source # Import adsb and ais modules to reset their state from routes import adsb as adsb_module @@ -668,7 +675,7 @@ def kill_all() -> Response: killed = [] processes_to_kill = [ - 'rtl_fm', 'multimon-ng', 'rtl_433', + 'rtl_fm', 'multimon-ng', 'rtl_433', 'rtl_sdr', 'airodump-ng', 'aireplay-ng', 'airmon-ng', 'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher', 'hcitool', 'bluetoothctl', 'dsd', @@ -741,9 +748,10 @@ def kill_all() -> Response: except Exception: pass - # Clear SDR device registry + # Clear SDR device registry and waterfall source with sdr_device_registry_lock: sdr_device_registry.clear() + waterfall_source = None return jsonify({'status': 'killed', 'processes': killed}) diff --git a/routes/listening_post.py b/routes/listening_post.py index 658acdb..3121ff5 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -19,8 +19,8 @@ from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import get_logger -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.constants import ( SSE_QUEUE_TIMEOUT, SSE_KEEPALIVE_INTERVAL, @@ -1181,13 +1181,13 @@ def stream_scanner_events() -> Response: while True: try: - msg = scanner_queue.get(timeout=SSE_QUEUE_TIMEOUT) - last_keepalive = time.time() - try: - process_event('listening_scanner', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + msg = scanner_queue.get(timeout=SSE_QUEUE_TIMEOUT) + last_keepalive = time.time() + try: + process_event('listening_scanner', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: @@ -1239,10 +1239,10 @@ def get_presets() -> Response: # MANUAL AUDIO ENDPOINTS (for direct listening) # ============================================ -@listening_post_bp.route('/audio/start', methods=['POST']) -def start_audio() -> Response: - """Start audio at specific frequency (manual mode).""" - global scanner_running, scanner_active_device, listening_active_device, scanner_power_process, scanner_thread +@listening_post_bp.route('/audio/start', methods=['POST']) +def start_audio() -> Response: + """Start audio at specific frequency (manual mode).""" + global scanner_running, scanner_active_device, listening_active_device, scanner_power_process, scanner_thread # Stop scanner if running if scanner_running: @@ -1271,7 +1271,7 @@ def start_audio() -> Response: pass time.sleep(0.5) - data = request.json or {} + data = request.json or {} try: frequency = float(data.get('frequency', 0)) @@ -1286,11 +1286,11 @@ def start_audio() -> Response: 'message': f'Invalid parameter: {e}' }), 400 - if frequency <= 0: - return jsonify({ - 'status': 'error', - 'message': 'frequency is required' - }), 400 + if frequency <= 0: + return jsonify({ + 'status': 'error', + 'message': 'frequency is required' + }), 400 valid_sdr_types = ['rtlsdr', 'hackrf', 'airspy', 'limesdr', 'sdrplay'] if sdr_type not in valid_sdr_types: @@ -1299,19 +1299,19 @@ def start_audio() -> Response: 'message': f'Invalid sdr_type. Use: {", ".join(valid_sdr_types)}' }), 400 - # Update config for audio - scanner_config['squelch'] = squelch - scanner_config['gain'] = gain - scanner_config['device'] = device - scanner_config['sdr_type'] = sdr_type - - # Stop waterfall if it's using the same SDR - if waterfall_running and waterfall_active_device == device: - _stop_waterfall_internal() - time.sleep(0.2) + # Update config for audio + scanner_config['squelch'] = squelch + scanner_config['gain'] = gain + scanner_config['device'] = device + scanner_config['sdr_type'] = sdr_type - # Claim device for listening audio - if listening_active_device is None or listening_active_device != device: + # Stop waterfall if it's using the same SDR + if waterfall_running and waterfall_active_device == device: + _stop_waterfall_internal() + time.sleep(0.2) + + # Claim device for listening audio + if listening_active_device is None or listening_active_device != device: if listening_active_device is not None: app_module.release_sdr_device(listening_active_device) error = app_module.claim_sdr_device(device, 'listening') @@ -1524,207 +1524,217 @@ waterfall_thread: Optional[threading.Thread] = None waterfall_running = False waterfall_lock = threading.Lock() waterfall_queue: queue.Queue = queue.Queue(maxsize=200) -waterfall_active_device: Optional[int] = None -waterfall_config = { - 'start_freq': 88.0, - 'end_freq': 108.0, - 'bin_size': 10000, - 'gain': 40, - 'device': 0, - 'max_bins': 1024, - 'interval': 0.4, -} +waterfall_active_device: Optional[int] = None +waterfall_config = { + 'start_freq': 88.0, + 'end_freq': 108.0, + 'bin_size': 10000, + 'gain': 40, + 'device': 0, + 'max_bins': 1024, + 'interval': 0.4, +} -def _parse_rtl_power_line(line: str) -> tuple[str | None, float | None, float | None, list[float]]: - """Parse a single rtl_power CSV line into bins.""" - if not line or line.startswith('#'): - return None, None, None, [] - - parts = [p.strip() for p in line.split(',')] - if len(parts) < 6: - return None, None, None, [] - - # Timestamp in first two fields (YYYY-MM-DD, HH:MM:SS) - timestamp = f"{parts[0]} {parts[1]}" if len(parts) >= 2 else parts[0] - - start_idx = None - for i, tok in enumerate(parts): - try: - val = float(tok) - except ValueError: - continue - if val > 1e5: - start_idx = i - break - if start_idx is None or len(parts) < start_idx + 4: - return timestamp, None, None, [] - - try: - seg_start = float(parts[start_idx]) - seg_end = float(parts[start_idx + 1]) - raw_values = [] - for v in parts[start_idx + 3:]: - try: - raw_values.append(float(v)) - except ValueError: - continue - if raw_values and raw_values[0] >= 0 and any(val < 0 for val in raw_values[1:]): - raw_values = raw_values[1:] - return timestamp, seg_start, seg_end, raw_values - except ValueError: - return timestamp, None, None, [] - - -def _waterfall_loop(): - """Continuous rtl_power sweep loop emitting waterfall data.""" - global waterfall_running, waterfall_process - - rtl_power_path = find_rtl_power() - if not rtl_power_path: - logger.error("rtl_power not found for waterfall") - waterfall_running = False - return - - start_hz = int(waterfall_config['start_freq'] * 1e6) - end_hz = int(waterfall_config['end_freq'] * 1e6) - bin_hz = int(waterfall_config['bin_size']) - gain = waterfall_config['gain'] - device = waterfall_config['device'] - interval = float(waterfall_config.get('interval', 0.4)) - - cmd = [ - rtl_power_path, - '-f', f'{start_hz}:{end_hz}:{bin_hz}', - '-i', str(interval), - '-g', str(gain), - '-d', str(device), - ] - - try: - waterfall_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - bufsize=1, - text=True, - ) - - current_ts = None - all_bins: list[float] = [] - sweep_start_hz = start_hz - sweep_end_hz = end_hz - - if not waterfall_process.stdout: - return - - for line in waterfall_process.stdout: - if not waterfall_running: - break - - ts, seg_start, seg_end, bins = _parse_rtl_power_line(line) - if ts is None or not bins: - continue - - if current_ts is None: - current_ts = ts - - if ts != current_ts and all_bins: - max_bins = int(waterfall_config.get('max_bins') or 0) - bins_to_send = all_bins - if max_bins > 0 and len(bins_to_send) > max_bins: - bins_to_send = _downsample_bins(bins_to_send, max_bins) - msg = { - 'type': 'waterfall_sweep', - 'start_freq': sweep_start_hz / 1e6, - 'end_freq': sweep_end_hz / 1e6, - 'bins': bins_to_send, - 'timestamp': datetime.now().isoformat(), - } - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - try: - waterfall_queue.get_nowait() - except queue.Empty: - pass - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - pass - - all_bins = [] - sweep_start_hz = start_hz - sweep_end_hz = end_hz - current_ts = ts - - all_bins.extend(bins) - if seg_start is not None: - sweep_start_hz = min(sweep_start_hz, seg_start) - if seg_end is not None: - sweep_end_hz = max(sweep_end_hz, seg_end) - - # Flush any remaining bins - if all_bins and waterfall_running: - max_bins = int(waterfall_config.get('max_bins') or 0) - bins_to_send = all_bins - if max_bins > 0 and len(bins_to_send) > max_bins: - bins_to_send = _downsample_bins(bins_to_send, max_bins) - msg = { - 'type': 'waterfall_sweep', - 'start_freq': sweep_start_hz / 1e6, - 'end_freq': sweep_end_hz / 1e6, - 'bins': bins_to_send, - 'timestamp': datetime.now().isoformat(), - } - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - pass - - except Exception as e: - logger.error(f"Waterfall loop error: {e}") - finally: - waterfall_running = False - if waterfall_process and waterfall_process.poll() is None: - try: - waterfall_process.terminate() - waterfall_process.wait(timeout=1) - except Exception: - try: - waterfall_process.kill() - except Exception: - pass - waterfall_process = None - logger.info("Waterfall loop stopped") - - -def _stop_waterfall_internal() -> None: - """Stop the waterfall display and release resources.""" - global waterfall_running, waterfall_process, waterfall_active_device - - waterfall_running = False - if waterfall_process and waterfall_process.poll() is None: - try: - waterfall_process.terminate() - waterfall_process.wait(timeout=1) - except Exception: - try: - waterfall_process.kill() - except Exception: - pass - waterfall_process = None - - if waterfall_active_device is not None: - app_module.release_sdr_device(waterfall_active_device) - waterfall_active_device = None +def _parse_rtl_power_line(line: str) -> tuple[str | None, float | None, float | None, list[float]]: + """Parse a single rtl_power CSV line into bins.""" + if not line or line.startswith('#'): + return None, None, None, [] + + parts = [p.strip() for p in line.split(',')] + if len(parts) < 6: + return None, None, None, [] + + # Timestamp in first two fields (YYYY-MM-DD, HH:MM:SS) + timestamp = f"{parts[0]} {parts[1]}" if len(parts) >= 2 else parts[0] + + start_idx = None + for i, tok in enumerate(parts): + try: + val = float(tok) + except ValueError: + continue + if val > 1e5: + start_idx = i + break + if start_idx is None or len(parts) < start_idx + 4: + return timestamp, None, None, [] + + try: + seg_start = float(parts[start_idx]) + seg_end = float(parts[start_idx + 1]) + raw_values = [] + for v in parts[start_idx + 3:]: + try: + raw_values.append(float(v)) + except ValueError: + continue + if raw_values and raw_values[0] >= 0 and any(val < 0 for val in raw_values[1:]): + raw_values = raw_values[1:] + return timestamp, seg_start, seg_end, raw_values + except ValueError: + return timestamp, None, None, [] + + +def _waterfall_loop(): + """Continuous rtl_power sweep loop emitting waterfall data.""" + global waterfall_running, waterfall_process + + rtl_power_path = find_rtl_power() + if not rtl_power_path: + logger.error("rtl_power not found for waterfall") + waterfall_running = False + return + + start_hz = int(waterfall_config['start_freq'] * 1e6) + end_hz = int(waterfall_config['end_freq'] * 1e6) + bin_hz = int(waterfall_config['bin_size']) + gain = waterfall_config['gain'] + device = waterfall_config['device'] + interval = float(waterfall_config.get('interval', 0.4)) + + cmd = [ + rtl_power_path, + '-f', f'{start_hz}:{end_hz}:{bin_hz}', + '-i', str(interval), + '-g', str(gain), + '-d', str(device), + ] + + try: + waterfall_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=1, + text=True, + ) + + current_ts = None + all_bins: list[float] = [] + sweep_start_hz = start_hz + sweep_end_hz = end_hz + + if not waterfall_process.stdout: + return + + for line in waterfall_process.stdout: + if not waterfall_running: + break + + ts, seg_start, seg_end, bins = _parse_rtl_power_line(line) + if ts is None or not bins: + continue + + if current_ts is None: + current_ts = ts + + if ts != current_ts and all_bins: + max_bins = int(waterfall_config.get('max_bins') or 0) + bins_to_send = all_bins + if max_bins > 0 and len(bins_to_send) > max_bins: + bins_to_send = _downsample_bins(bins_to_send, max_bins) + msg = { + 'type': 'waterfall_sweep', + 'start_freq': sweep_start_hz / 1e6, + 'end_freq': sweep_end_hz / 1e6, + 'bins': bins_to_send, + 'timestamp': datetime.now().isoformat(), + } + try: + waterfall_queue.put_nowait(msg) + except queue.Full: + try: + waterfall_queue.get_nowait() + except queue.Empty: + pass + try: + waterfall_queue.put_nowait(msg) + except queue.Full: + pass + + all_bins = [] + sweep_start_hz = start_hz + sweep_end_hz = end_hz + current_ts = ts + + all_bins.extend(bins) + if seg_start is not None: + sweep_start_hz = min(sweep_start_hz, seg_start) + if seg_end is not None: + sweep_end_hz = max(sweep_end_hz, seg_end) + + # Flush any remaining bins + if all_bins and waterfall_running: + max_bins = int(waterfall_config.get('max_bins') or 0) + bins_to_send = all_bins + if max_bins > 0 and len(bins_to_send) > max_bins: + bins_to_send = _downsample_bins(bins_to_send, max_bins) + msg = { + 'type': 'waterfall_sweep', + 'start_freq': sweep_start_hz / 1e6, + 'end_freq': sweep_end_hz / 1e6, + 'bins': bins_to_send, + 'timestamp': datetime.now().isoformat(), + } + try: + waterfall_queue.put_nowait(msg) + except queue.Full: + pass + + except Exception as e: + logger.error(f"Waterfall loop error: {e}") + finally: + waterfall_running = False + if waterfall_process and waterfall_process.poll() is None: + try: + waterfall_process.terminate() + waterfall_process.wait(timeout=1) + except Exception: + try: + waterfall_process.kill() + except Exception: + pass + waterfall_process = None + logger.info("Waterfall loop stopped") + + +def _stop_waterfall_internal() -> None: + """Stop the waterfall display and release resources.""" + global waterfall_running, waterfall_process, waterfall_active_device + + waterfall_running = False + if waterfall_process and waterfall_process.poll() is None: + try: + waterfall_process.terminate() + waterfall_process.wait(timeout=1) + except Exception: + try: + waterfall_process.kill() + except Exception: + pass + waterfall_process = None + + if waterfall_active_device is not None: + app_module.release_sdr_device(waterfall_active_device) + waterfall_active_device = None @listening_post_bp.route('/waterfall/start', methods=['POST']) -def start_waterfall() -> Response: +def start_waterfall() -> Response: """Start the waterfall/spectrogram display.""" global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device + # Check if a decoder is already producing FFT data via IQ pipeline + if app_module.waterfall_source in ('pager', 'sensor'): + # Decoder-driven waterfall: data is already flowing into waterfall_queue + waterfall_running = True + return jsonify({ + 'status': 'started', + 'source': 'decoder', + 'decoder': app_module.waterfall_source, + }) + with waterfall_lock: if waterfall_running: return jsonify({'status': 'error', 'message': 'Waterfall already running'}), 409 @@ -1734,24 +1744,24 @@ def start_waterfall() -> Response: data = request.json or {} - try: - waterfall_config['start_freq'] = float(data.get('start_freq', 88.0)) - waterfall_config['end_freq'] = float(data.get('end_freq', 108.0)) - waterfall_config['bin_size'] = int(data.get('bin_size', 10000)) - waterfall_config['gain'] = int(data.get('gain', 40)) - waterfall_config['device'] = int(data.get('device', 0)) - if data.get('interval') is not None: - interval = float(data.get('interval', waterfall_config['interval'])) - if interval < 0.1 or interval > 5: - return jsonify({'status': 'error', 'message': 'interval must be between 0.1 and 5 seconds'}), 400 - waterfall_config['interval'] = interval - if data.get('max_bins') is not None: - max_bins = int(data.get('max_bins', waterfall_config['max_bins'])) - if max_bins < 64 or max_bins > 4096: - return jsonify({'status': 'error', 'message': 'max_bins must be between 64 and 4096'}), 400 - waterfall_config['max_bins'] = max_bins - except (ValueError, TypeError) as e: - return jsonify({'status': 'error', 'message': f'Invalid parameter: {e}'}), 400 + try: + waterfall_config['start_freq'] = float(data.get('start_freq', 88.0)) + waterfall_config['end_freq'] = float(data.get('end_freq', 108.0)) + waterfall_config['bin_size'] = int(data.get('bin_size', 10000)) + waterfall_config['gain'] = int(data.get('gain', 40)) + waterfall_config['device'] = int(data.get('device', 0)) + if data.get('interval') is not None: + interval = float(data.get('interval', waterfall_config['interval'])) + if interval < 0.1 or interval > 5: + return jsonify({'status': 'error', 'message': 'interval must be between 0.1 and 5 seconds'}), 400 + waterfall_config['interval'] = interval + if data.get('max_bins') is not None: + max_bins = int(data.get('max_bins', waterfall_config['max_bins'])) + if max_bins < 64 or max_bins > 4096: + return jsonify({'status': 'error', 'message': 'max_bins must be between 64 and 4096'}), 400 + waterfall_config['max_bins'] = max_bins + except (ValueError, TypeError) as e: + return jsonify({'status': 'error', 'message': f'Invalid parameter: {e}'}), 400 if waterfall_config['start_freq'] >= waterfall_config['end_freq']: return jsonify({'status': 'error', 'message': 'start_freq must be less than end_freq'}), 400 @@ -1777,11 +1787,18 @@ def start_waterfall() -> Response: @listening_post_bp.route('/waterfall/stop', methods=['POST']) -def stop_waterfall() -> Response: - """Stop the waterfall display.""" - _stop_waterfall_internal() - - return jsonify({'status': 'stopped'}) +def stop_waterfall() -> Response: + """Stop the waterfall display.""" + global waterfall_running + + # If waterfall is decoder-driven, just disconnect (don't stop the decoder) + if app_module.waterfall_source in ('pager', 'sensor'): + waterfall_running = False + return jsonify({'status': 'stopped', 'source': 'decoder'}) + + _stop_waterfall_internal() + + return jsonify({'status': 'stopped'}) @listening_post_bp.route('/waterfall/stream') @@ -1790,14 +1807,14 @@ def stream_waterfall() -> Response: def generate() -> Generator[str, None, None]: last_keepalive = time.time() while True: - try: - msg = waterfall_queue.get(timeout=SSE_QUEUE_TIMEOUT) - last_keepalive = time.time() - try: - process_event('waterfall', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + try: + msg = waterfall_queue.get(timeout=SSE_QUEUE_TIMEOUT) + last_keepalive = time.time() + try: + process_event('waterfall', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: @@ -1808,20 +1825,20 @@ def stream_waterfall() -> Response: response.headers['Cache-Control'] = 'no-cache' response.headers['X-Accel-Buffering'] = 'no' return response -def _downsample_bins(values: list[float], target: int) -> list[float]: - """Downsample bins to a target length using simple averaging.""" - if target <= 0 or len(values) <= target: - return values - - out: list[float] = [] - step = len(values) / target - for i in range(target): - start = int(i * step) - end = int((i + 1) * step) - if end <= start: - end = min(start + 1, len(values)) - chunk = values[start:end] - if not chunk: - continue - out.append(sum(chunk) / len(chunk)) - return out +def _downsample_bins(values: list[float], target: int) -> list[float]: + """Downsample bins to a target length using simple averaging.""" + if target <= 0 or len(values) <= target: + return values + + out: list[float] = [] + step = len(values) / target + for i in range(target): + start = int(i * step) + end = int((i + 1) * step) + if end <= start: + end = min(start + 1, len(values)) + chunk = values[start:end] + if not chunk: + continue + out.append(sum(chunk) / len(chunk)) + return out diff --git a/routes/pager.py b/routes/pager.py index 4ee5425..ce5a8ce 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -22,8 +22,8 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType, SDRValidationError from utils.dependencies import get_tool_path @@ -32,6 +32,8 @@ pager_bp = Blueprint('pager', __name__) # Track which device is being used pager_active_device: int | None = None +# IQ pipeline stop event (set to signal IQ processor thread to exit) +pager_iq_stop_event: threading.Event | None = None def parse_multimon_output(line: str) -> dict[str, str] | None: @@ -147,12 +149,18 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.output_queue.put({'type': 'error', 'text': str(e)}) finally: - global pager_active_device + global pager_active_device, pager_iq_stop_event try: os.close(master_fd) except OSError: pass - # Cleanup companion rtl_fm process and decoder + # Stop IQ pipeline if running + if pager_iq_stop_event is not None: + pager_iq_stop_event.set() + pager_iq_stop_event = None + if app_module.waterfall_source == 'pager': + app_module.waterfall_source = None + # Cleanup companion rtl_sdr/rtl_fm process and decoder with app_module.process_lock: rtl_proc = getattr(app_module.current_process, '_rtl_process', None) for proc in [rtl_proc, process]: @@ -175,6 +183,28 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: pager_active_device = None +def _cleanup_failed_start(rtl_process: subprocess.Popen | None) -> None: + """Clean up after a failed start attempt.""" + global pager_active_device, pager_iq_stop_event + if rtl_process: + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass + if pager_iq_stop_event is not None: + pager_iq_stop_event.set() + pager_iq_stop_event = None + if app_module.waterfall_source == 'pager': + app_module.waterfall_source = None + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None + + @pager_bp.route('/start', methods=['POST']) def start_decoding() -> Response: global pager_active_device @@ -272,115 +302,196 @@ def start_decoding() -> Response: builder = SDRFactory.get_builder(sdr_device.sdr_type) - # Build FM demodulation command - bias_t = data.get('bias_t', False) - rtl_cmd = builder.build_fm_demod_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=22050, - gain=float(gain) if gain and gain != '0' else None, - ppm=int(ppm) if ppm and ppm != '0' else None, - modulation='fm', - squelch=squelch if squelch and squelch != 0 else None, - bias_t=bias_t - ) - multimon_path = get_tool_path('multimon-ng') if not multimon_path: + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None return jsonify({'status': 'error', 'message': 'multimon-ng not found'}), 400 multimon_cmd = [multimon_path, '-t', 'raw'] + decoders + ['-f', 'alpha', '-'] - full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) - logger.info(f"Running: {full_cmd}") + bias_t = data.get('bias_t', False) + gain_val = float(gain) if gain and gain != '0' else None + ppm_val = int(ppm) if ppm and ppm != '0' else None - try: - # Create pipe: rtl_fm | multimon-ng - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE + # Determine if we can use IQ pipeline for live waterfall + use_iq_pipeline = ( + sdr_type == SDRType.RTL_SDR + and not rtl_tcp_host + and get_tool_path('rtl_sdr') is not None + ) + + if use_iq_pipeline: + # IQ pipeline: rtl_sdr -> Python IQ processor -> multimon-ng + iq_sample_rate = 220500 # 22050 * 10 for exact decimation + rtl_cmd = builder.build_raw_capture_command( + device=sdr_device, + frequency_mhz=freq, + sample_rate=iq_sample_rate, + gain=gain_val, + ppm=ppm_val, + bias_t=bias_t, ) - register_process(rtl_process) - # Start a thread to monitor rtl_fm stderr for errors - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f"[RTL_FM] {err_text}") - app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'}) + full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(multimon_cmd) + logger.info(f"Running (IQ pipeline): {full_cmd}") - rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr) - rtl_stderr_thread.daemon = True - rtl_stderr_thread.start() + try: + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + register_process(rtl_process) - # Create a pseudo-terminal for multimon-ng output - master_fd, slave_fd = pty.openpty() + # Monitor rtl_sdr stderr + def monitor_rtl_stderr(): + for line in rtl_process.stderr: + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + logger.debug(f"[rtl_sdr] {err_text}") + app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_sdr] {err_text}'}) - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=rtl_process.stdout, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True + rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True) + rtl_stderr_thread.start() + + # Create PTY for multimon-ng output + master_fd, slave_fd = pty.openpty() + + multimon_process = subprocess.Popen( + multimon_cmd, + stdin=subprocess.PIPE, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True, + ) + register_process(multimon_process) + + os.close(slave_fd) + + # Start IQ processor thread + from routes.listening_post import waterfall_queue + from utils.iq_processor import run_fm_iq_pipeline + + stop_event = threading.Event() + pager_iq_stop_event = stop_event + app_module.waterfall_source = 'pager' + + iq_thread = threading.Thread( + target=run_fm_iq_pipeline, + args=( + rtl_process.stdout, + multimon_process.stdin, + waterfall_queue, + freq, + iq_sample_rate, + stop_event, + ), + daemon=True, + ) + iq_thread.start() + + app_module.current_process = multimon_process + app_module.current_process._rtl_process = rtl_process + app_module.current_process._master_fd = master_fd + + # Start decoder output thread + thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True) + thread.start() + + app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'pager'}) + + except FileNotFoundError as e: + _cleanup_failed_start(rtl_process) + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) + except Exception as e: + _cleanup_failed_start(rtl_process) + return jsonify({'status': 'error', 'message': str(e)}) + + else: + # Legacy pipeline: rtl_fm | multimon-ng + rtl_cmd = builder.build_fm_demod_command( + device=sdr_device, + frequency_mhz=freq, + sample_rate=22050, + gain=gain_val, + ppm=ppm_val, + modulation='fm', + squelch=squelch if squelch and squelch != 0 else None, + bias_t=bias_t, ) - register_process(multimon_process) - os.close(slave_fd) - rtl_process.stdout.close() + full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) + logger.info(f"Running: {full_cmd}") - app_module.current_process = multimon_process - app_module.current_process._rtl_process = rtl_process - app_module.current_process._master_fd = master_fd - - # Start output thread with PTY master fd - thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process)) - thread.daemon = True - thread.start() - - app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - - return jsonify({'status': 'started', 'command': full_cmd}) - - except FileNotFoundError as e: - # Kill orphaned rtl_fm process try: - rtl_process.terminate() - rtl_process.wait(timeout=2) - except Exception: - try: - rtl_process.kill() - except Exception: - pass - # Release device on failure - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) - except Exception as e: - # Kill orphaned rtl_fm process if it was started - try: - rtl_process.terminate() - rtl_process.wait(timeout=2) - except Exception: - try: - rtl_process.kill() - except Exception: - pass - # Release device on failure - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + register_process(rtl_process) + + # Start a thread to monitor rtl_fm stderr for errors + def monitor_rtl_stderr(): + for line in rtl_process.stderr: + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + logger.debug(f"[RTL_FM] {err_text}") + app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'}) + + rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True) + rtl_stderr_thread.start() + + # Create a pseudo-terminal for multimon-ng output + master_fd, slave_fd = pty.openpty() + + multimon_process = subprocess.Popen( + multimon_cmd, + stdin=rtl_process.stdout, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True, + ) + register_process(multimon_process) + + os.close(slave_fd) + rtl_process.stdout.close() + + app_module.current_process = multimon_process + app_module.current_process._rtl_process = rtl_process + app_module.current_process._master_fd = master_fd + + # Start output thread with PTY master fd + thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True) + thread.start() + + app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + return jsonify({'status': 'started', 'command': full_cmd}) + + except FileNotFoundError as e: + _cleanup_failed_start(rtl_process) + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) + except Exception as e: + _cleanup_failed_start(rtl_process) + return jsonify({'status': 'error', 'message': str(e)}) @pager_bp.route('/stop', methods=['POST']) def stop_decoding() -> Response: - global pager_active_device + global pager_active_device, pager_iq_stop_event with app_module.process_lock: if app_module.current_process: - # Kill rtl_fm process first + # Stop IQ pipeline if running + if pager_iq_stop_event is not None: + pager_iq_stop_event.set() + pager_iq_stop_event = None + if app_module.waterfall_source == 'pager': + app_module.waterfall_source = None + + # Kill rtl_sdr/rtl_fm process first if hasattr(app_module.current_process, '_rtl_process'): try: app_module.current_process._rtl_process.terminate() @@ -469,14 +580,14 @@ def stream() -> Response: keepalive_interval = 30.0 # Send keepalive every 30 seconds instead of 1 second while True: - try: - msg = app_module.output_queue.get(timeout=1) - last_keepalive = time.time() - try: - process_event('pager', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + try: + msg = app_module.output_queue.get(timeout=1) + last_keepalive = time.time() + try: + process_event('pager', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= keepalive_interval: diff --git a/routes/sensor.py b/routes/sensor.py index e5a719e..c78e095 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -18,15 +18,20 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType +from utils.dependencies import get_tool_path sensor_bp = Blueprint('sensor', __name__) # Track which device is being used sensor_active_device: int | None = None +# IQ pipeline stop event +sensor_iq_stop_event: threading.Event | None = None +# Companion rtl_sdr process when using IQ pipeline +sensor_rtl_process: subprocess.Popen | None = None def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: @@ -60,8 +65,26 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) finally: - global sensor_active_device - # Ensure process is terminated + global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process + # Stop IQ pipeline if running + if sensor_iq_stop_event is not None: + sensor_iq_stop_event.set() + sensor_iq_stop_event = None + if app_module.waterfall_source == 'sensor': + app_module.waterfall_source = None + # Terminate companion rtl_sdr process + if sensor_rtl_process is not None: + try: + sensor_rtl_process.terminate() + sensor_rtl_process.wait(timeout=2) + except Exception: + try: + sensor_rtl_process.kill() + except Exception: + pass + unregister_process(sensor_rtl_process) + sensor_rtl_process = None + # Ensure decoder process is terminated try: process.terminate() process.wait(timeout=2) @@ -80,9 +103,32 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: sensor_active_device = None +def _cleanup_sensor_failed_start(rtl_process: subprocess.Popen | None) -> None: + """Clean up after a failed sensor start attempt.""" + global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process + if rtl_process: + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass + if sensor_iq_stop_event is not None: + sensor_iq_stop_event.set() + sensor_iq_stop_event = None + if app_module.waterfall_source == 'sensor': + app_module.waterfall_source = None + sensor_rtl_process = None + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + + @sensor_bp.route('/start_sensor', methods=['POST']) def start_sensor() -> Response: - global sensor_active_device + global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process with app_module.sensor_lock: if app_module.sensor_process: @@ -144,69 +190,187 @@ def start_sensor() -> Response: sdr_device = SDRFactory.create_default_device(sdr_type, index=device) builder = SDRFactory.get_builder(sdr_device.sdr_type) - - # Build ISM band decoder command bias_t = data.get('bias_t', False) - cmd = builder.build_ism_command( - device=sdr_device, - frequency_mhz=freq, - gain=float(gain) if gain and gain != 0 else None, - ppm=int(ppm) if ppm and ppm != 0 else None, - bias_t=bias_t + gain_val = float(gain) if gain and gain != 0 else None + ppm_val = int(ppm) if ppm and ppm != 0 else None + + # Determine if we can use IQ pipeline for live waterfall + use_iq_pipeline = ( + sdr_type == SDRType.RTL_SDR + and not rtl_tcp_host + and get_tool_path('rtl_sdr') is not None ) - full_cmd = ' '.join(cmd) - logger.info(f"Running: {full_cmd}") + if use_iq_pipeline: + # IQ pipeline: rtl_sdr -> Python IQ tee -> rtl_433 -r - + iq_sample_rate = 250000 # rtl_433 default - try: - app_module.sensor_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE + rtl_cmd = builder.build_raw_capture_command( + device=sdr_device, + frequency_mhz=freq, + sample_rate=iq_sample_rate, + gain=gain_val, + ppm=ppm_val, + bias_t=bias_t, ) - register_process(app_module.sensor_process) - # Start output thread - thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,)) - thread.daemon = True - thread.start() + rtl_433_path = get_tool_path('rtl_433') or 'rtl_433' + decoder_cmd = [rtl_433_path, '-r', '-', '-s', str(iq_sample_rate), '-F', 'json'] - # Monitor stderr - def monitor_stderr(): - for line in app_module.sensor_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_433] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) + full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(decoder_cmd) + logger.info(f"Running (IQ pipeline): {full_cmd}") - stderr_thread = threading.Thread(target=monitor_stderr) - stderr_thread.daemon = True - stderr_thread.start() + try: + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + register_process(rtl_process) + sensor_rtl_process = rtl_process - app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + # Monitor rtl_sdr stderr + def monitor_rtl_stderr(): + for line in rtl_process.stderr: + err = line.decode('utf-8', errors='replace').strip() + if err: + logger.debug(f"[rtl_sdr] {err}") + app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_sdr] {err}'}) - return jsonify({'status': 'started', 'command': full_cmd}) + threading.Thread(target=monitor_rtl_stderr, daemon=True).start() - except FileNotFoundError: - # Release device on failure - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) - except Exception as e: - # Release device on failure - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) + # Start rtl_433 reading from stdin + decoder_process = subprocess.Popen( + decoder_cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + register_process(decoder_process) + + # Start IQ processor thread + from routes.listening_post import waterfall_queue + from utils.iq_processor import run_passthrough_iq_pipeline + + stop_event = threading.Event() + sensor_iq_stop_event = stop_event + app_module.waterfall_source = 'sensor' + + iq_thread = threading.Thread( + target=run_passthrough_iq_pipeline, + args=( + rtl_process.stdout, + decoder_process.stdin, + waterfall_queue, + freq, + iq_sample_rate, + stop_event, + ), + daemon=True, + ) + iq_thread.start() + + app_module.sensor_process = decoder_process + + # Monitor rtl_433 stderr + def monitor_decoder_stderr(): + for line in decoder_process.stderr: + err = line.decode('utf-8', errors='replace').strip() + if err: + logger.debug(f"[rtl_433] {err}") + app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) + + threading.Thread(target=monitor_decoder_stderr, daemon=True).start() + + # Start output thread + thread = threading.Thread(target=stream_sensor_output, args=(decoder_process,), daemon=True) + thread.start() + + app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'sensor'}) + + except FileNotFoundError: + _cleanup_sensor_failed_start(rtl_process) + return jsonify({'status': 'error', 'message': 'rtl_sdr or rtl_433 not found'}) + except Exception as e: + _cleanup_sensor_failed_start(rtl_process) + return jsonify({'status': 'error', 'message': str(e)}) + + else: + # Legacy pipeline: rtl_433 directly + cmd = builder.build_ism_command( + device=sdr_device, + frequency_mhz=freq, + gain=gain_val, + ppm=ppm_val, + bias_t=bias_t, + ) + + full_cmd = ' '.join(cmd) + logger.info(f"Running: {full_cmd}") + + try: + app_module.sensor_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + register_process(app_module.sensor_process) + + # Start output thread + thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,), daemon=True) + thread.start() + + # Monitor stderr + def monitor_stderr(): + for line in app_module.sensor_process.stderr: + err = line.decode('utf-8', errors='replace').strip() + if err: + logger.debug(f"[rtl_433] {err}") + app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) + + threading.Thread(target=monitor_stderr, daemon=True).start() + + app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + return jsonify({'status': 'started', 'command': full_cmd}) + + except FileNotFoundError: + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) + except Exception as e: + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + return jsonify({'status': 'error', 'message': str(e)}) @sensor_bp.route('/stop_sensor', methods=['POST']) def stop_sensor() -> Response: - global sensor_active_device + global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process with app_module.sensor_lock: if app_module.sensor_process: + # Stop IQ pipeline if running + if sensor_iq_stop_event is not None: + sensor_iq_stop_event.set() + sensor_iq_stop_event = None + if app_module.waterfall_source == 'sensor': + app_module.waterfall_source = None + + # Kill companion rtl_sdr process + if sensor_rtl_process is not None: + try: + sensor_rtl_process.terminate() + sensor_rtl_process.wait(timeout=2) + except (subprocess.TimeoutExpired, OSError): + try: + sensor_rtl_process.kill() + except OSError: + pass + sensor_rtl_process = None + app_module.sensor_process.terminate() try: app_module.sensor_process.wait(timeout=2) @@ -232,13 +396,13 @@ def stream_sensor() -> Response: while True: try: - msg = app_module.sensor_queue.get(timeout=1) - last_keepalive = time.time() - try: - process_event('sensor', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + msg = app_module.sensor_queue.get(timeout=1) + last_keepalive = time.time() + try: + process_event('sensor', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= keepalive_interval: diff --git a/static/js/modes/listening-post.js b/static/js/modes/listening-post.js index 78c5010..b1b2b54 100644 --- a/static/js/modes/listening-post.js +++ b/static/js/modes/listening-post.js @@ -3589,7 +3589,8 @@ async function startWaterfall(options = {}) { lastWaterfallDraw = 0; initWaterfallCanvas(); connectWaterfallSSE(); - if (typeof reserveDevice === 'function') { + // Only reserve device if not decoder-driven (decoder already owns the device) + if (data.source !== 'decoder' && typeof reserveDevice === 'function') { reserveDevice(parseInt(device), 'waterfall'); } if (resume || resumeRfWaterfallAfterListening) { @@ -3618,11 +3619,14 @@ async function stopWaterfall() { } try { - await fetch('/listening/waterfall/stop', { method: 'POST' }); + const resp = await fetch('/listening/waterfall/stop', { method: 'POST' }); + let stopData = {}; + try { stopData = await resp.json(); } catch (e) {} isWaterfallRunning = false; if (waterfallEventSource) { waterfallEventSource.close(); waterfallEventSource = null; } setWaterfallControlButtons(false); - if (typeof releaseDevice === 'function') { + // Only release device if it was a standalone waterfall (not decoder-driven) + if (stopData.source !== 'decoder' && typeof releaseDevice === 'function') { releaseDevice('waterfall'); } } catch (err) { diff --git a/utils/dependencies.py b/utils/dependencies.py index 256b995..1fb57a5 100644 --- a/utils/dependencies.py +++ b/utils/dependencies.py @@ -121,6 +121,15 @@ TOOL_DEPENDENCIES = { 'manual': 'https://github.com/EliasOenal/multimon-ng' } }, + 'rtl_sdr': { + 'required': False, + 'description': 'Raw IQ capture for live waterfall during decoding', + 'install': { + 'apt': 'sudo apt install rtl-sdr', + 'brew': 'brew install librtlsdr', + 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' + } + }, 'rtl_test': { 'required': False, 'description': 'RTL-SDR device detection', @@ -143,6 +152,15 @@ TOOL_DEPENDENCIES = { 'brew': 'brew install rtl_433', 'manual': 'https://github.com/merbanan/rtl_433' } + }, + 'rtl_sdr': { + 'required': False, + 'description': 'Raw IQ capture for live waterfall during decoding', + 'install': { + 'apt': 'sudo apt install rtl-sdr', + 'brew': 'brew install librtlsdr', + 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' + } } } }, diff --git a/utils/iq_processor.py b/utils/iq_processor.py new file mode 100644 index 0000000..a6d1bfa --- /dev/null +++ b/utils/iq_processor.py @@ -0,0 +1,230 @@ +"""IQ processing pipelines for live waterfall during SDR decoding. + +Provides two pipeline functions: +- run_fm_iq_pipeline: FM demodulates IQ for pager decoding + FFT for waterfall +- run_passthrough_iq_pipeline: Passes raw IQ to rtl_433 + FFT for waterfall +""" + +from __future__ import annotations + +import logging +import struct +import threading +import queue +from datetime import datetime +from typing import IO, Optional + +import numpy as np + +logger = logging.getLogger('intercept.iq_processor') + +# FFT parameters +FFT_SIZE = 2048 +FFT_INTERVAL_SECONDS = 0.1 # ~10 updates/sec + + +def iq_to_complex(buf: bytes) -> np.ndarray: + """Convert raw uint8 IQ bytes to complex float samples. + + RTL-SDR outputs interleaved uint8 I/Q pairs centered at 127.5. + """ + raw = np.frombuffer(buf, dtype=np.uint8).astype(np.float32) + raw = (raw - 127.5) / 127.5 + return raw[0::2] + 1j * raw[1::2] + + +def compute_fft_bins(samples: np.ndarray, fft_size: int = FFT_SIZE) -> list[float]: + """Compute power spectral density in dB from complex IQ samples. + + Returns a list of power values (dB) for each frequency bin. + """ + if len(samples) < fft_size: + # Pad with zeros if not enough samples + padded = np.zeros(fft_size, dtype=np.complex64) + padded[:len(samples)] = samples[:fft_size] + samples = padded + else: + samples = samples[:fft_size] + + # Apply Hanning window to reduce spectral leakage + window = np.hanning(fft_size).astype(np.float32) + windowed = samples * window + + # FFT and shift DC to center + spectrum = np.fft.fftshift(np.fft.fft(windowed)) + + # Power in dB (avoid log of zero) + power = np.abs(spectrum) ** 2 + power = np.maximum(power, 1e-20) + power_db = 10.0 * np.log10(power) + + return power_db.tolist() + + +def _push_waterfall(waterfall_queue: queue.Queue, bins: list[float], + center_freq_mhz: float, sample_rate: int) -> None: + """Push a waterfall sweep message to the queue.""" + half_span = (sample_rate / 1e6) / 2.0 + msg = { + 'type': 'waterfall_sweep', + 'start_freq': center_freq_mhz - half_span, + 'end_freq': center_freq_mhz + half_span, + 'bins': bins, + 'timestamp': datetime.now().isoformat(), + } + try: + waterfall_queue.put_nowait(msg) + except queue.Full: + # Drop oldest and retry + try: + waterfall_queue.get_nowait() + except queue.Empty: + pass + try: + waterfall_queue.put_nowait(msg) + except queue.Full: + pass + + +def run_fm_iq_pipeline( + iq_stdout: IO[bytes], + audio_stdin: IO[bytes], + waterfall_queue: queue.Queue, + center_freq_mhz: float, + sample_rate: int, + stop_event: threading.Event, +) -> None: + """FM demodulation pipeline: IQ -> FFT + FM demod -> 22050 Hz PCM. + + Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall, + FM demodulates, decimates to 22050 Hz, and writes 16-bit PCM to + multimon-ng stdin. + + Args: + iq_stdout: rtl_sdr stdout (raw uint8 IQ) + audio_stdin: multimon-ng stdin (16-bit PCM) + waterfall_queue: Queue for waterfall sweep messages + center_freq_mhz: Center frequency in MHz + sample_rate: IQ sample rate (should be 220500 for 10x decimation to 22050) + stop_event: Threading event to signal shutdown + """ + from scipy.signal import decimate as scipy_decimate + + # Decimation factor: sample_rate / 22050 + decim_factor = sample_rate // 22050 + if decim_factor < 1: + decim_factor = 1 + + # Read in chunks: ~100ms worth of IQ data (2 bytes per sample: I + Q) + chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2 + # Align to even number of bytes (I/Q pairs) + chunk_bytes = (chunk_bytes // 2) * 2 + + # Previous sample for FM demod continuity + prev_sample = np.complex64(0) + + logger.info(f"FM IQ pipeline started: {center_freq_mhz} MHz, " + f"sr={sample_rate}, decim={decim_factor}") + + try: + while not stop_event.is_set(): + raw = iq_stdout.read(chunk_bytes) + if not raw: + break + + # Convert to complex IQ + iq = iq_to_complex(raw) + if len(iq) == 0: + continue + + # Compute FFT for waterfall + bins = compute_fft_bins(iq, FFT_SIZE) + _push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate) + + # FM demodulation via instantaneous phase difference + # Prepend previous sample for continuity + iq_with_prev = np.concatenate(([prev_sample], iq)) + prev_sample = iq[-1] + + phase_diff = np.angle(iq_with_prev[1:] * np.conj(iq_with_prev[:-1])) + + # Decimate to 22050 Hz + if decim_factor > 1: + audio = scipy_decimate(phase_diff, decim_factor, ftype='fir') + else: + audio = phase_diff + + # Scale to 16-bit PCM range + audio = np.clip(audio * 10000, -32767, 32767).astype(np.int16) + + # Write to multimon-ng + try: + audio_stdin.write(audio.tobytes()) + audio_stdin.flush() + except (BrokenPipeError, OSError): + break + + except Exception as e: + logger.error(f"FM IQ pipeline error: {e}") + finally: + logger.info("FM IQ pipeline stopped") + try: + audio_stdin.close() + except Exception: + pass + + +def run_passthrough_iq_pipeline( + iq_stdout: IO[bytes], + decoder_stdin: IO[bytes], + waterfall_queue: queue.Queue, + center_freq_mhz: float, + sample_rate: int, + stop_event: threading.Event, +) -> None: + """Passthrough pipeline: IQ -> FFT + raw bytes to decoder. + + Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall, + and writes raw IQ bytes unchanged to rtl_433 stdin. + + Args: + iq_stdout: rtl_sdr stdout (raw uint8 IQ) + decoder_stdin: rtl_433 stdin (raw cu8 IQ) + waterfall_queue: Queue for waterfall sweep messages + center_freq_mhz: Center frequency in MHz + sample_rate: IQ sample rate (should be 250000 for rtl_433) + stop_event: Threading event to signal shutdown + """ + # Read in chunks: ~100ms worth of IQ data + chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2 + chunk_bytes = (chunk_bytes // 2) * 2 + + logger.info(f"Passthrough IQ pipeline started: {center_freq_mhz} MHz, sr={sample_rate}") + + try: + while not stop_event.is_set(): + raw = iq_stdout.read(chunk_bytes) + if not raw: + break + + # Compute FFT for waterfall + iq = iq_to_complex(raw) + if len(iq) > 0: + bins = compute_fft_bins(iq, FFT_SIZE) + _push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate) + + # Pass raw bytes unchanged to decoder + try: + decoder_stdin.write(raw) + decoder_stdin.flush() + except (BrokenPipeError, OSError): + break + + except Exception as e: + logger.error(f"Passthrough IQ pipeline error: {e}") + finally: + logger.info("Passthrough IQ pipeline stopped") + try: + decoder_stdin.close() + except Exception: + pass diff --git a/utils/sdr/base.py b/utils/sdr/base.py index 4dc79be..411d923 100644 --- a/utils/sdr/base.py +++ b/utils/sdr/base.py @@ -186,6 +186,36 @@ class CommandBuilder(ABC): """Return hardware capabilities for this SDR type.""" pass + def build_raw_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False + ) -> list[str]: + """ + Build raw IQ capture command (for IQ-based waterfall during decoding). + + Args: + device: The SDR device to use + frequency_mhz: Center frequency in MHz + sample_rate: Sample rate in Hz + gain: Gain in dB (None for auto) + ppm: PPM frequency correction + bias_t: Enable bias-T power (for active antennas) + + Returns: + Command as list of strings for subprocess + + Raises: + NotImplementedError: If the SDR type does not support raw capture + """ + raise NotImplementedError( + f"Raw IQ capture not supported for {self.get_sdr_type().value}" + ) + @classmethod @abstractmethod def get_sdr_type(cls) -> SDRType: diff --git a/utils/sdr/rtlsdr.py b/utils/sdr/rtlsdr.py index 6d2b8d8..5de2551 100644 --- a/utils/sdr/rtlsdr.py +++ b/utils/sdr/rtlsdr.py @@ -197,6 +197,43 @@ class RTLSDRCommandBuilder(CommandBuilder): return cmd + def build_raw_capture_command( + self, + device: SDRDevice, + frequency_mhz: float, + sample_rate: int, + gain: Optional[float] = None, + ppm: Optional[int] = None, + bias_t: bool = False + ) -> list[str]: + """ + Build rtl_sdr command for raw IQ capture. + + Outputs raw uint8 IQ to stdout for processing by IQ pipelines. + """ + rtl_sdr_path = get_tool_path('rtl_sdr') or 'rtl_sdr' + freq_hz = int(frequency_mhz * 1e6) + cmd = [ + rtl_sdr_path, + '-d', self._get_device_arg(device), + '-f', str(freq_hz), + '-s', str(sample_rate), + ] + + if gain is not None and gain > 0: + cmd.extend(['-g', str(gain)]) + + if ppm is not None and ppm != 0: + cmd.extend(['-p', str(ppm)]) + + if bias_t: + cmd.extend(['-T']) + + # Output to stdout + cmd.append('-') + + return cmd + def build_ais_command( self, device: SDRDevice,