From beb38b6b9824d07ff4ff486a78696d0b29d113e9 Mon Sep 17 00:00:00 2001 From: Smittix Date: Sat, 7 Feb 2026 23:29:56 +0000 Subject: [PATCH] Remove waterfall from all modes except listening post Reverts IQ pipeline and removes syncWaterfallToFrequency calls from pager, sensor, rtlamr, DMR, SSTV, and SSTV general modes. Waterfall is now exclusive to listening post mode. Co-Authored-By: Claude Opus 4.6 --- app.py | 14 +- routes/listening_post.py | 17 -- routes/pager.py | 309 ++++++++++-------------------- routes/sensor.py | 278 ++++++--------------------- static/js/modes/dmr.js | 3 - static/js/modes/listening-post.js | 10 +- static/js/modes/sstv-general.js | 3 - static/js/modes/sstv.js | 21 +- templates/index.html | 40 +--- utils/dependencies.py | 18 -- utils/iq_processor.py | 230 ---------------------- utils/sdr/base.py | 30 --- utils/sdr/rtlsdr.py | 37 ---- 13 files changed, 172 insertions(+), 838 deletions(-) delete mode 100644 utils/iq_processor.py diff --git a/app.py b/app.py index 71386e4..cf1934a 100644 --- a/app.py +++ b/app.py @@ -232,13 +232,6 @@ cleanup_manager.register(ais_vessels) cleanup_manager.register(dsc_messages) cleanup_manager.register(deauth_alerts) -# ============================================ -# WATERFALL SOURCE TRACKING -# ============================================ -# Tracks whether waterfall data is being produced by a decoder's IQ pipeline -# None = no active source, 'rtl_power' = standalone, 'pager'/'sensor' = decoder-driven -waterfall_source: str | None = None - # ============================================ # SDR DEVICE REGISTRY # ============================================ @@ -666,7 +659,7 @@ def kill_all() -> Response: """Kill all decoder, WiFi, and Bluetooth processes.""" global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process - global dmr_process, dmr_rtl_process, waterfall_source + global dmr_process, dmr_rtl_process # Import adsb and ais modules to reset their state from routes import adsb as adsb_module @@ -675,7 +668,7 @@ def kill_all() -> Response: killed = [] processes_to_kill = [ - 'rtl_fm', 'multimon-ng', 'rtl_433', 'rtl_sdr', + 'rtl_fm', 'multimon-ng', 'rtl_433', 'airodump-ng', 'aireplay-ng', 'airmon-ng', 'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher', 'hcitool', 'bluetoothctl', 'dsd', @@ -748,10 +741,9 @@ def kill_all() -> Response: except Exception: pass - # Clear SDR device registry and waterfall source + # Clear SDR device registry with sdr_device_registry_lock: sdr_device_registry.clear() - waterfall_source = None return jsonify({'status': 'killed', 'processes': killed}) diff --git a/routes/listening_post.py b/routes/listening_post.py index 3121ff5..5775641 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -1725,16 +1725,6 @@ def start_waterfall() -> Response: """Start the waterfall/spectrogram display.""" global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device - # Check if a decoder is already producing FFT data via IQ pipeline - if app_module.waterfall_source in ('pager', 'sensor'): - # Decoder-driven waterfall: data is already flowing into waterfall_queue - waterfall_running = True - return jsonify({ - 'status': 'started', - 'source': 'decoder', - 'decoder': app_module.waterfall_source, - }) - with waterfall_lock: if waterfall_running: return jsonify({'status': 'error', 'message': 'Waterfall already running'}), 409 @@ -1789,13 +1779,6 @@ def start_waterfall() -> Response: @listening_post_bp.route('/waterfall/stop', methods=['POST']) def stop_waterfall() -> Response: """Stop the waterfall display.""" - global waterfall_running - - # If waterfall is decoder-driven, just disconnect (don't stop the decoder) - if app_module.waterfall_source in ('pager', 'sensor'): - waterfall_running = False - return jsonify({'status': 'stopped', 'source': 'decoder'}) - _stop_waterfall_internal() return jsonify({'status': 'stopped'}) diff --git a/routes/pager.py b/routes/pager.py index ce5a8ce..4ee5425 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -22,8 +22,8 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType, SDRValidationError from utils.dependencies import get_tool_path @@ -32,8 +32,6 @@ pager_bp = Blueprint('pager', __name__) # Track which device is being used pager_active_device: int | None = None -# IQ pipeline stop event (set to signal IQ processor thread to exit) -pager_iq_stop_event: threading.Event | None = None def parse_multimon_output(line: str) -> dict[str, str] | None: @@ -149,18 +147,12 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.output_queue.put({'type': 'error', 'text': str(e)}) finally: - global pager_active_device, pager_iq_stop_event + global pager_active_device try: os.close(master_fd) except OSError: pass - # Stop IQ pipeline if running - if pager_iq_stop_event is not None: - pager_iq_stop_event.set() - pager_iq_stop_event = None - if app_module.waterfall_source == 'pager': - app_module.waterfall_source = None - # Cleanup companion rtl_sdr/rtl_fm process and decoder + # Cleanup companion rtl_fm process and decoder with app_module.process_lock: rtl_proc = getattr(app_module.current_process, '_rtl_process', None) for proc in [rtl_proc, process]: @@ -183,28 +175,6 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: pager_active_device = None -def _cleanup_failed_start(rtl_process: subprocess.Popen | None) -> None: - """Clean up after a failed start attempt.""" - global pager_active_device, pager_iq_stop_event - if rtl_process: - try: - rtl_process.terminate() - rtl_process.wait(timeout=2) - except Exception: - try: - rtl_process.kill() - except Exception: - pass - if pager_iq_stop_event is not None: - pager_iq_stop_event.set() - pager_iq_stop_event = None - if app_module.waterfall_source == 'pager': - app_module.waterfall_source = None - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - - @pager_bp.route('/start', methods=['POST']) def start_decoding() -> Response: global pager_active_device @@ -302,196 +272,115 @@ def start_decoding() -> Response: builder = SDRFactory.get_builder(sdr_device.sdr_type) + # Build FM demodulation command + bias_t = data.get('bias_t', False) + rtl_cmd = builder.build_fm_demod_command( + device=sdr_device, + frequency_mhz=freq, + sample_rate=22050, + gain=float(gain) if gain and gain != '0' else None, + ppm=int(ppm) if ppm and ppm != '0' else None, + modulation='fm', + squelch=squelch if squelch and squelch != 0 else None, + bias_t=bias_t + ) + multimon_path = get_tool_path('multimon-ng') if not multimon_path: - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None return jsonify({'status': 'error', 'message': 'multimon-ng not found'}), 400 multimon_cmd = [multimon_path, '-t', 'raw'] + decoders + ['-f', 'alpha', '-'] - bias_t = data.get('bias_t', False) - gain_val = float(gain) if gain and gain != '0' else None - ppm_val = int(ppm) if ppm and ppm != '0' else None + full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) + logger.info(f"Running: {full_cmd}") - # Determine if we can use IQ pipeline for live waterfall - use_iq_pipeline = ( - sdr_type == SDRType.RTL_SDR - and not rtl_tcp_host - and get_tool_path('rtl_sdr') is not None - ) - - if use_iq_pipeline: - # IQ pipeline: rtl_sdr -> Python IQ processor -> multimon-ng - iq_sample_rate = 220500 # 22050 * 10 for exact decimation - rtl_cmd = builder.build_raw_capture_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=iq_sample_rate, - gain=gain_val, - ppm=ppm_val, - bias_t=bias_t, + try: + # Create pipe: rtl_fm | multimon-ng + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) + register_process(rtl_process) - full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(multimon_cmd) - logger.info(f"Running (IQ pipeline): {full_cmd}") + # Start a thread to monitor rtl_fm stderr for errors + def monitor_rtl_stderr(): + for line in rtl_process.stderr: + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + logger.debug(f"[RTL_FM] {err_text}") + app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'}) - try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(rtl_process) + rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr) + rtl_stderr_thread.daemon = True + rtl_stderr_thread.start() - # Monitor rtl_sdr stderr - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f"[rtl_sdr] {err_text}") - app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_sdr] {err_text}'}) + # Create a pseudo-terminal for multimon-ng output + master_fd, slave_fd = pty.openpty() - rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True) - rtl_stderr_thread.start() - - # Create PTY for multimon-ng output - master_fd, slave_fd = pty.openpty() - - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=subprocess.PIPE, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True, - ) - register_process(multimon_process) - - os.close(slave_fd) - - # Start IQ processor thread - from routes.listening_post import waterfall_queue - from utils.iq_processor import run_fm_iq_pipeline - - stop_event = threading.Event() - pager_iq_stop_event = stop_event - app_module.waterfall_source = 'pager' - - iq_thread = threading.Thread( - target=run_fm_iq_pipeline, - args=( - rtl_process.stdout, - multimon_process.stdin, - waterfall_queue, - freq, - iq_sample_rate, - stop_event, - ), - daemon=True, - ) - iq_thread.start() - - app_module.current_process = multimon_process - app_module.current_process._rtl_process = rtl_process - app_module.current_process._master_fd = master_fd - - # Start decoder output thread - thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True) - thread.start() - - app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'pager'}) - - except FileNotFoundError as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) - except Exception as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': str(e)}) - - else: - # Legacy pipeline: rtl_fm | multimon-ng - rtl_cmd = builder.build_fm_demod_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=22050, - gain=gain_val, - ppm=ppm_val, - modulation='fm', - squelch=squelch if squelch and squelch != 0 else None, - bias_t=bias_t, + multimon_process = subprocess.Popen( + multimon_cmd, + stdin=rtl_process.stdout, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True ) + register_process(multimon_process) - full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) - logger.info(f"Running: {full_cmd}") + os.close(slave_fd) + rtl_process.stdout.close() + app_module.current_process = multimon_process + app_module.current_process._rtl_process = rtl_process + app_module.current_process._master_fd = master_fd + + # Start output thread with PTY master fd + thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process)) + thread.daemon = True + thread.start() + + app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + + return jsonify({'status': 'started', 'command': full_cmd}) + + except FileNotFoundError as e: + # Kill orphaned rtl_fm process try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(rtl_process) - - # Start a thread to monitor rtl_fm stderr for errors - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f"[RTL_FM] {err_text}") - app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'}) - - rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True) - rtl_stderr_thread.start() - - # Create a pseudo-terminal for multimon-ng output - master_fd, slave_fd = pty.openpty() - - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=rtl_process.stdout, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True, - ) - register_process(multimon_process) - - os.close(slave_fd) - rtl_process.stdout.close() - - app_module.current_process = multimon_process - app_module.current_process._rtl_process = rtl_process - app_module.current_process._master_fd = master_fd - - # Start output thread with PTY master fd - thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True) - thread.start() - - app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd}) - - except FileNotFoundError as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) - except Exception as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': str(e)}) + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass + # Release device on failure + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) + except Exception as e: + # Kill orphaned rtl_fm process if it was started + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass + # Release device on failure + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None + return jsonify({'status': 'error', 'message': str(e)}) @pager_bp.route('/stop', methods=['POST']) def stop_decoding() -> Response: - global pager_active_device, pager_iq_stop_event + global pager_active_device with app_module.process_lock: if app_module.current_process: - # Stop IQ pipeline if running - if pager_iq_stop_event is not None: - pager_iq_stop_event.set() - pager_iq_stop_event = None - if app_module.waterfall_source == 'pager': - app_module.waterfall_source = None - - # Kill rtl_sdr/rtl_fm process first + # Kill rtl_fm process first if hasattr(app_module.current_process, '_rtl_process'): try: app_module.current_process._rtl_process.terminate() @@ -580,14 +469,14 @@ def stream() -> Response: keepalive_interval = 30.0 # Send keepalive every 30 seconds instead of 1 second while True: - try: - msg = app_module.output_queue.get(timeout=1) - last_keepalive = time.time() - try: - process_event('pager', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + try: + msg = app_module.output_queue.get(timeout=1) + last_keepalive = time.time() + try: + process_event('pager', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= keepalive_interval: diff --git a/routes/sensor.py b/routes/sensor.py index c78e095..e5a719e 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -18,20 +18,15 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType -from utils.dependencies import get_tool_path sensor_bp = Blueprint('sensor', __name__) # Track which device is being used sensor_active_device: int | None = None -# IQ pipeline stop event -sensor_iq_stop_event: threading.Event | None = None -# Companion rtl_sdr process when using IQ pipeline -sensor_rtl_process: subprocess.Popen | None = None def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: @@ -65,26 +60,8 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) finally: - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process - # Stop IQ pipeline if running - if sensor_iq_stop_event is not None: - sensor_iq_stop_event.set() - sensor_iq_stop_event = None - if app_module.waterfall_source == 'sensor': - app_module.waterfall_source = None - # Terminate companion rtl_sdr process - if sensor_rtl_process is not None: - try: - sensor_rtl_process.terminate() - sensor_rtl_process.wait(timeout=2) - except Exception: - try: - sensor_rtl_process.kill() - except Exception: - pass - unregister_process(sensor_rtl_process) - sensor_rtl_process = None - # Ensure decoder process is terminated + global sensor_active_device + # Ensure process is terminated try: process.terminate() process.wait(timeout=2) @@ -103,32 +80,9 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: sensor_active_device = None -def _cleanup_sensor_failed_start(rtl_process: subprocess.Popen | None) -> None: - """Clean up after a failed sensor start attempt.""" - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process - if rtl_process: - try: - rtl_process.terminate() - rtl_process.wait(timeout=2) - except Exception: - try: - rtl_process.kill() - except Exception: - pass - if sensor_iq_stop_event is not None: - sensor_iq_stop_event.set() - sensor_iq_stop_event = None - if app_module.waterfall_source == 'sensor': - app_module.waterfall_source = None - sensor_rtl_process = None - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - - @sensor_bp.route('/start_sensor', methods=['POST']) def start_sensor() -> Response: - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process + global sensor_active_device with app_module.sensor_lock: if app_module.sensor_process: @@ -190,187 +144,69 @@ def start_sensor() -> Response: sdr_device = SDRFactory.create_default_device(sdr_type, index=device) builder = SDRFactory.get_builder(sdr_device.sdr_type) - bias_t = data.get('bias_t', False) - gain_val = float(gain) if gain and gain != 0 else None - ppm_val = int(ppm) if ppm and ppm != 0 else None - # Determine if we can use IQ pipeline for live waterfall - use_iq_pipeline = ( - sdr_type == SDRType.RTL_SDR - and not rtl_tcp_host - and get_tool_path('rtl_sdr') is not None + # Build ISM band decoder command + bias_t = data.get('bias_t', False) + cmd = builder.build_ism_command( + device=sdr_device, + frequency_mhz=freq, + gain=float(gain) if gain and gain != 0 else None, + ppm=int(ppm) if ppm and ppm != 0 else None, + bias_t=bias_t ) - if use_iq_pipeline: - # IQ pipeline: rtl_sdr -> Python IQ tee -> rtl_433 -r - - iq_sample_rate = 250000 # rtl_433 default + full_cmd = ' '.join(cmd) + logger.info(f"Running: {full_cmd}") - rtl_cmd = builder.build_raw_capture_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=iq_sample_rate, - gain=gain_val, - ppm=ppm_val, - bias_t=bias_t, + try: + app_module.sensor_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) + register_process(app_module.sensor_process) - rtl_433_path = get_tool_path('rtl_433') or 'rtl_433' - decoder_cmd = [rtl_433_path, '-r', '-', '-s', str(iq_sample_rate), '-F', 'json'] + # Start output thread + thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,)) + thread.daemon = True + thread.start() - full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(decoder_cmd) - logger.info(f"Running (IQ pipeline): {full_cmd}") + # Monitor stderr + def monitor_stderr(): + for line in app_module.sensor_process.stderr: + err = line.decode('utf-8', errors='replace').strip() + if err: + logger.debug(f"[rtl_433] {err}") + app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(rtl_process) - sensor_rtl_process = rtl_process + stderr_thread = threading.Thread(target=monitor_stderr) + stderr_thread.daemon = True + stderr_thread.start() - # Monitor rtl_sdr stderr - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_sdr] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_sdr] {err}'}) + app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - threading.Thread(target=monitor_rtl_stderr, daemon=True).start() + return jsonify({'status': 'started', 'command': full_cmd}) - # Start rtl_433 reading from stdin - decoder_process = subprocess.Popen( - decoder_cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(decoder_process) - - # Start IQ processor thread - from routes.listening_post import waterfall_queue - from utils.iq_processor import run_passthrough_iq_pipeline - - stop_event = threading.Event() - sensor_iq_stop_event = stop_event - app_module.waterfall_source = 'sensor' - - iq_thread = threading.Thread( - target=run_passthrough_iq_pipeline, - args=( - rtl_process.stdout, - decoder_process.stdin, - waterfall_queue, - freq, - iq_sample_rate, - stop_event, - ), - daemon=True, - ) - iq_thread.start() - - app_module.sensor_process = decoder_process - - # Monitor rtl_433 stderr - def monitor_decoder_stderr(): - for line in decoder_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_433] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - - threading.Thread(target=monitor_decoder_stderr, daemon=True).start() - - # Start output thread - thread = threading.Thread(target=stream_sensor_output, args=(decoder_process,), daemon=True) - thread.start() - - app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'sensor'}) - - except FileNotFoundError: - _cleanup_sensor_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': 'rtl_sdr or rtl_433 not found'}) - except Exception as e: - _cleanup_sensor_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': str(e)}) - - else: - # Legacy pipeline: rtl_433 directly - cmd = builder.build_ism_command( - device=sdr_device, - frequency_mhz=freq, - gain=gain_val, - ppm=ppm_val, - bias_t=bias_t, - ) - - full_cmd = ' '.join(cmd) - logger.info(f"Running: {full_cmd}") - - try: - app_module.sensor_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(app_module.sensor_process) - - # Start output thread - thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,), daemon=True) - thread.start() - - # Monitor stderr - def monitor_stderr(): - for line in app_module.sensor_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_433] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - - threading.Thread(target=monitor_stderr, daemon=True).start() - - app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd}) - - except FileNotFoundError: - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) - except Exception as e: - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) + except FileNotFoundError: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) + except Exception as e: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + return jsonify({'status': 'error', 'message': str(e)}) @sensor_bp.route('/stop_sensor', methods=['POST']) def stop_sensor() -> Response: - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process + global sensor_active_device with app_module.sensor_lock: if app_module.sensor_process: - # Stop IQ pipeline if running - if sensor_iq_stop_event is not None: - sensor_iq_stop_event.set() - sensor_iq_stop_event = None - if app_module.waterfall_source == 'sensor': - app_module.waterfall_source = None - - # Kill companion rtl_sdr process - if sensor_rtl_process is not None: - try: - sensor_rtl_process.terminate() - sensor_rtl_process.wait(timeout=2) - except (subprocess.TimeoutExpired, OSError): - try: - sensor_rtl_process.kill() - except OSError: - pass - sensor_rtl_process = None - app_module.sensor_process.terminate() try: app_module.sensor_process.wait(timeout=2) @@ -396,13 +232,13 @@ def stream_sensor() -> Response: while True: try: - msg = app_module.sensor_queue.get(timeout=1) - last_keepalive = time.time() - try: - process_event('sensor', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + msg = app_module.sensor_queue.get(timeout=1) + last_keepalive = time.time() + try: + process_event('sensor', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= keepalive_interval: diff --git a/static/js/modes/dmr.js b/static/js/modes/dmr.js index 5e1675d..447afaa 100644 --- a/static/js/modes/dmr.js +++ b/static/js/modes/dmr.js @@ -88,9 +88,6 @@ function startDmr() { if (typeof reserveDevice === 'function') { reserveDevice(parseInt(device), 'dmr'); } - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true }); - } if (typeof showNotification === 'function') { showNotification('DMR', `Decoding ${frequency} MHz (${protocol.toUpperCase()})`); } diff --git a/static/js/modes/listening-post.js b/static/js/modes/listening-post.js index b1b2b54..78c5010 100644 --- a/static/js/modes/listening-post.js +++ b/static/js/modes/listening-post.js @@ -3589,8 +3589,7 @@ async function startWaterfall(options = {}) { lastWaterfallDraw = 0; initWaterfallCanvas(); connectWaterfallSSE(); - // Only reserve device if not decoder-driven (decoder already owns the device) - if (data.source !== 'decoder' && typeof reserveDevice === 'function') { + if (typeof reserveDevice === 'function') { reserveDevice(parseInt(device), 'waterfall'); } if (resume || resumeRfWaterfallAfterListening) { @@ -3619,14 +3618,11 @@ async function stopWaterfall() { } try { - const resp = await fetch('/listening/waterfall/stop', { method: 'POST' }); - let stopData = {}; - try { stopData = await resp.json(); } catch (e) {} + await fetch('/listening/waterfall/stop', { method: 'POST' }); isWaterfallRunning = false; if (waterfallEventSource) { waterfallEventSource.close(); waterfallEventSource = null; } setWaterfallControlButtons(false); - // Only release device if it was a standalone waterfall (not decoder-driven) - if (stopData.source !== 'decoder' && typeof releaseDevice === 'function') { + if (typeof releaseDevice === 'function') { releaseDevice('waterfall'); } } catch (err) { diff --git a/static/js/modes/sstv-general.js b/static/js/modes/sstv-general.js index 1029334..0b89efe 100644 --- a/static/js/modes/sstv-general.js +++ b/static/js/modes/sstv-general.js @@ -98,9 +98,6 @@ const SSTVGeneral = (function() { updateStatusUI('listening', `${frequency} MHz ${modulation.toUpperCase()}`); startStream(); showNotification('SSTV', `Listening on ${frequency} MHz ${modulation.toUpperCase()}`); - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true }); - } // Update strip const stripFreq = document.getElementById('sstvGeneralStripFreq'); diff --git a/static/js/modes/sstv.js b/static/js/modes/sstv.js index ea9d9e6..6bafdb0 100644 --- a/static/js/modes/sstv.js +++ b/static/js/modes/sstv.js @@ -537,18 +537,15 @@ const SSTV = (function() { const data = await response.json(); - if (data.status === 'started' || data.status === 'already_running') { - isRunning = true; - if (typeof reserveDevice === 'function') { - reserveDevice(device, 'sstv'); - } - updateStatusUI('listening', `${frequency} MHz`); - startStream(); - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true }); - } - showNotification('SSTV', `Listening on ${frequency} MHz`); - } else { + if (data.status === 'started' || data.status === 'already_running') { + isRunning = true; + if (typeof reserveDevice === 'function') { + reserveDevice(device, 'sstv'); + } + updateStatusUI('listening', `${frequency} MHz`); + startStream(); + showNotification('SSTV', `Listening on ${frequency} MHz`); + } else { updateStatusUI('idle', 'Start failed'); showStatusMessage(data.message || 'Failed to start decoder', 'error'); } diff --git a/templates/index.html b/templates/index.html index 7982c6f..25131ed 100644 --- a/templates/index.html +++ b/templates/index.html @@ -2859,23 +2859,6 @@ } } - function getModeWaterfallFrequency(mode) { - const lookup = { - pager: 'frequency', - sensor: 'sensorFrequency', - rtlamr: 'rtlamrFrequency', - dmr: 'dmrFrequency', - sstv: 'sstvFrequency', - sstv_general: 'sstvGeneralFrequency', - listening: 'radioScanStart' - }; - const id = lookup[mode]; - if (!id) return NaN; - const el = document.getElementById(id); - const value = parseFloat(el?.value); - return Number.isFinite(value) ? value : NaN; - } - // Mode switching function switchMode(mode, options = {}) { const { updateUrl = true } = options; @@ -3079,19 +3062,13 @@ // Show shared waterfall controls for supported modes const waterfallControlsSection = document.getElementById('waterfallControlsSection'); const waterfallPanel = document.getElementById('waterfallPanel'); - const waterfallModes = ['pager', 'sensor', 'rtlamr', 'dmr', 'sstv', 'sstv_general', 'listening']; + const waterfallModes = ['listening']; const waterfallSupported = waterfallModes.includes(mode); if (waterfallControlsSection) waterfallControlsSection.style.display = waterfallSupported ? 'block' : 'none'; if (waterfallPanel) { const running = (typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning); waterfallPanel.style.display = (waterfallSupported && running) ? 'block' : 'none'; } - if (waterfallSupported && typeof syncWaterfallToFrequency === 'function' && typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning) { - const modeFreq = getModeWaterfallFrequency(mode); - if (Number.isFinite(modeFreq)) { - syncWaterfallToFrequency(modeFreq, { autoStart: true, restartIfRunning: true, silent: true }); - } - } // Toggle mode-specific tool status displays const toolStatusPager = document.getElementById('toolStatusPager'); @@ -3187,9 +3164,6 @@ // Sensor frequency function setSensorFreq(freq) { document.getElementById('sensorFrequency').value = freq; - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning }); - } if (isSensorRunning) { fetch('/stop_sensor', { method: 'POST' }) .then(() => setTimeout(() => startSensorDecoding(), 500)); @@ -3274,9 +3248,6 @@ reserveDevice(parseInt(device), 'sensor'); setSensorRunning(true); startSensorStream(); - if (!remoteConfig && typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true }); - } // Initialize sensor filter bar const filterContainer = document.getElementById('filterBarContainer'); @@ -3530,9 +3501,6 @@ function setRtlamrFreq(freq) { document.getElementById('rtlamrFrequency').value = freq; - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning }); - } } // RTLAMR mode polling timer for agent mode @@ -3588,9 +3556,6 @@ } setRtlamrRunning(true); startRtlamrStream(isAgentMode); - if (!isAgentMode && typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true }); - } // Initialize meter filter bar (reuse sensor filter bar since same structure) const filterContainer = document.getElementById('filterBarContainer'); @@ -4362,9 +4327,6 @@ } setRunning(true); startStream(isAgentMode); - if (!isAgentMode && !remoteConfig && typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true }); - } // Initialize filter bar const filterContainer = document.getElementById('filterBarContainer'); diff --git a/utils/dependencies.py b/utils/dependencies.py index 1fb57a5..256b995 100644 --- a/utils/dependencies.py +++ b/utils/dependencies.py @@ -121,15 +121,6 @@ TOOL_DEPENDENCIES = { 'manual': 'https://github.com/EliasOenal/multimon-ng' } }, - 'rtl_sdr': { - 'required': False, - 'description': 'Raw IQ capture for live waterfall during decoding', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } - }, 'rtl_test': { 'required': False, 'description': 'RTL-SDR device detection', @@ -152,15 +143,6 @@ TOOL_DEPENDENCIES = { 'brew': 'brew install rtl_433', 'manual': 'https://github.com/merbanan/rtl_433' } - }, - 'rtl_sdr': { - 'required': False, - 'description': 'Raw IQ capture for live waterfall during decoding', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } } } }, diff --git a/utils/iq_processor.py b/utils/iq_processor.py deleted file mode 100644 index a6d1bfa..0000000 --- a/utils/iq_processor.py +++ /dev/null @@ -1,230 +0,0 @@ -"""IQ processing pipelines for live waterfall during SDR decoding. - -Provides two pipeline functions: -- run_fm_iq_pipeline: FM demodulates IQ for pager decoding + FFT for waterfall -- run_passthrough_iq_pipeline: Passes raw IQ to rtl_433 + FFT for waterfall -""" - -from __future__ import annotations - -import logging -import struct -import threading -import queue -from datetime import datetime -from typing import IO, Optional - -import numpy as np - -logger = logging.getLogger('intercept.iq_processor') - -# FFT parameters -FFT_SIZE = 2048 -FFT_INTERVAL_SECONDS = 0.1 # ~10 updates/sec - - -def iq_to_complex(buf: bytes) -> np.ndarray: - """Convert raw uint8 IQ bytes to complex float samples. - - RTL-SDR outputs interleaved uint8 I/Q pairs centered at 127.5. - """ - raw = np.frombuffer(buf, dtype=np.uint8).astype(np.float32) - raw = (raw - 127.5) / 127.5 - return raw[0::2] + 1j * raw[1::2] - - -def compute_fft_bins(samples: np.ndarray, fft_size: int = FFT_SIZE) -> list[float]: - """Compute power spectral density in dB from complex IQ samples. - - Returns a list of power values (dB) for each frequency bin. - """ - if len(samples) < fft_size: - # Pad with zeros if not enough samples - padded = np.zeros(fft_size, dtype=np.complex64) - padded[:len(samples)] = samples[:fft_size] - samples = padded - else: - samples = samples[:fft_size] - - # Apply Hanning window to reduce spectral leakage - window = np.hanning(fft_size).astype(np.float32) - windowed = samples * window - - # FFT and shift DC to center - spectrum = np.fft.fftshift(np.fft.fft(windowed)) - - # Power in dB (avoid log of zero) - power = np.abs(spectrum) ** 2 - power = np.maximum(power, 1e-20) - power_db = 10.0 * np.log10(power) - - return power_db.tolist() - - -def _push_waterfall(waterfall_queue: queue.Queue, bins: list[float], - center_freq_mhz: float, sample_rate: int) -> None: - """Push a waterfall sweep message to the queue.""" - half_span = (sample_rate / 1e6) / 2.0 - msg = { - 'type': 'waterfall_sweep', - 'start_freq': center_freq_mhz - half_span, - 'end_freq': center_freq_mhz + half_span, - 'bins': bins, - 'timestamp': datetime.now().isoformat(), - } - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - # Drop oldest and retry - try: - waterfall_queue.get_nowait() - except queue.Empty: - pass - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - pass - - -def run_fm_iq_pipeline( - iq_stdout: IO[bytes], - audio_stdin: IO[bytes], - waterfall_queue: queue.Queue, - center_freq_mhz: float, - sample_rate: int, - stop_event: threading.Event, -) -> None: - """FM demodulation pipeline: IQ -> FFT + FM demod -> 22050 Hz PCM. - - Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall, - FM demodulates, decimates to 22050 Hz, and writes 16-bit PCM to - multimon-ng stdin. - - Args: - iq_stdout: rtl_sdr stdout (raw uint8 IQ) - audio_stdin: multimon-ng stdin (16-bit PCM) - waterfall_queue: Queue for waterfall sweep messages - center_freq_mhz: Center frequency in MHz - sample_rate: IQ sample rate (should be 220500 for 10x decimation to 22050) - stop_event: Threading event to signal shutdown - """ - from scipy.signal import decimate as scipy_decimate - - # Decimation factor: sample_rate / 22050 - decim_factor = sample_rate // 22050 - if decim_factor < 1: - decim_factor = 1 - - # Read in chunks: ~100ms worth of IQ data (2 bytes per sample: I + Q) - chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2 - # Align to even number of bytes (I/Q pairs) - chunk_bytes = (chunk_bytes // 2) * 2 - - # Previous sample for FM demod continuity - prev_sample = np.complex64(0) - - logger.info(f"FM IQ pipeline started: {center_freq_mhz} MHz, " - f"sr={sample_rate}, decim={decim_factor}") - - try: - while not stop_event.is_set(): - raw = iq_stdout.read(chunk_bytes) - if not raw: - break - - # Convert to complex IQ - iq = iq_to_complex(raw) - if len(iq) == 0: - continue - - # Compute FFT for waterfall - bins = compute_fft_bins(iq, FFT_SIZE) - _push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate) - - # FM demodulation via instantaneous phase difference - # Prepend previous sample for continuity - iq_with_prev = np.concatenate(([prev_sample], iq)) - prev_sample = iq[-1] - - phase_diff = np.angle(iq_with_prev[1:] * np.conj(iq_with_prev[:-1])) - - # Decimate to 22050 Hz - if decim_factor > 1: - audio = scipy_decimate(phase_diff, decim_factor, ftype='fir') - else: - audio = phase_diff - - # Scale to 16-bit PCM range - audio = np.clip(audio * 10000, -32767, 32767).astype(np.int16) - - # Write to multimon-ng - try: - audio_stdin.write(audio.tobytes()) - audio_stdin.flush() - except (BrokenPipeError, OSError): - break - - except Exception as e: - logger.error(f"FM IQ pipeline error: {e}") - finally: - logger.info("FM IQ pipeline stopped") - try: - audio_stdin.close() - except Exception: - pass - - -def run_passthrough_iq_pipeline( - iq_stdout: IO[bytes], - decoder_stdin: IO[bytes], - waterfall_queue: queue.Queue, - center_freq_mhz: float, - sample_rate: int, - stop_event: threading.Event, -) -> None: - """Passthrough pipeline: IQ -> FFT + raw bytes to decoder. - - Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall, - and writes raw IQ bytes unchanged to rtl_433 stdin. - - Args: - iq_stdout: rtl_sdr stdout (raw uint8 IQ) - decoder_stdin: rtl_433 stdin (raw cu8 IQ) - waterfall_queue: Queue for waterfall sweep messages - center_freq_mhz: Center frequency in MHz - sample_rate: IQ sample rate (should be 250000 for rtl_433) - stop_event: Threading event to signal shutdown - """ - # Read in chunks: ~100ms worth of IQ data - chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2 - chunk_bytes = (chunk_bytes // 2) * 2 - - logger.info(f"Passthrough IQ pipeline started: {center_freq_mhz} MHz, sr={sample_rate}") - - try: - while not stop_event.is_set(): - raw = iq_stdout.read(chunk_bytes) - if not raw: - break - - # Compute FFT for waterfall - iq = iq_to_complex(raw) - if len(iq) > 0: - bins = compute_fft_bins(iq, FFT_SIZE) - _push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate) - - # Pass raw bytes unchanged to decoder - try: - decoder_stdin.write(raw) - decoder_stdin.flush() - except (BrokenPipeError, OSError): - break - - except Exception as e: - logger.error(f"Passthrough IQ pipeline error: {e}") - finally: - logger.info("Passthrough IQ pipeline stopped") - try: - decoder_stdin.close() - except Exception: - pass diff --git a/utils/sdr/base.py b/utils/sdr/base.py index 411d923..4dc79be 100644 --- a/utils/sdr/base.py +++ b/utils/sdr/base.py @@ -186,36 +186,6 @@ class CommandBuilder(ABC): """Return hardware capabilities for this SDR type.""" pass - def build_raw_capture_command( - self, - device: SDRDevice, - frequency_mhz: float, - sample_rate: int, - gain: Optional[float] = None, - ppm: Optional[int] = None, - bias_t: bool = False - ) -> list[str]: - """ - Build raw IQ capture command (for IQ-based waterfall during decoding). - - Args: - device: The SDR device to use - frequency_mhz: Center frequency in MHz - sample_rate: Sample rate in Hz - gain: Gain in dB (None for auto) - ppm: PPM frequency correction - bias_t: Enable bias-T power (for active antennas) - - Returns: - Command as list of strings for subprocess - - Raises: - NotImplementedError: If the SDR type does not support raw capture - """ - raise NotImplementedError( - f"Raw IQ capture not supported for {self.get_sdr_type().value}" - ) - @classmethod @abstractmethod def get_sdr_type(cls) -> SDRType: diff --git a/utils/sdr/rtlsdr.py b/utils/sdr/rtlsdr.py index 5de2551..6d2b8d8 100644 --- a/utils/sdr/rtlsdr.py +++ b/utils/sdr/rtlsdr.py @@ -197,43 +197,6 @@ class RTLSDRCommandBuilder(CommandBuilder): return cmd - def build_raw_capture_command( - self, - device: SDRDevice, - frequency_mhz: float, - sample_rate: int, - gain: Optional[float] = None, - ppm: Optional[int] = None, - bias_t: bool = False - ) -> list[str]: - """ - Build rtl_sdr command for raw IQ capture. - - Outputs raw uint8 IQ to stdout for processing by IQ pipelines. - """ - rtl_sdr_path = get_tool_path('rtl_sdr') or 'rtl_sdr' - freq_hz = int(frequency_mhz * 1e6) - cmd = [ - rtl_sdr_path, - '-d', self._get_device_arg(device), - '-f', str(freq_hz), - '-s', str(sample_rate), - ] - - if gain is not None and gain > 0: - cmd.extend(['-g', str(gain)]) - - if ppm is not None and ppm != 0: - cmd.extend(['-p', str(ppm)]) - - if bias_t: - cmd.extend(['-T']) - - # Output to stdout - cmd.append('-') - - return cmd - def build_ais_command( self, device: SDRDevice,