diff --git a/app.py b/app.py index 71386e4..cf1934a 100644 --- a/app.py +++ b/app.py @@ -232,13 +232,6 @@ cleanup_manager.register(ais_vessels) cleanup_manager.register(dsc_messages) cleanup_manager.register(deauth_alerts) -# ============================================ -# WATERFALL SOURCE TRACKING -# ============================================ -# Tracks whether waterfall data is being produced by a decoder's IQ pipeline -# None = no active source, 'rtl_power' = standalone, 'pager'/'sensor' = decoder-driven -waterfall_source: str | None = None - # ============================================ # SDR DEVICE REGISTRY # ============================================ @@ -666,7 +659,7 @@ def kill_all() -> Response: """Kill all decoder, WiFi, and Bluetooth processes.""" global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process - global dmr_process, dmr_rtl_process, waterfall_source + global dmr_process, dmr_rtl_process # Import adsb and ais modules to reset their state from routes import adsb as adsb_module @@ -675,7 +668,7 @@ def kill_all() -> Response: killed = [] processes_to_kill = [ - 'rtl_fm', 'multimon-ng', 'rtl_433', 'rtl_sdr', + 'rtl_fm', 'multimon-ng', 'rtl_433', 'airodump-ng', 'aireplay-ng', 'airmon-ng', 'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher', 'hcitool', 'bluetoothctl', 'dsd', @@ -748,10 +741,9 @@ def kill_all() -> Response: except Exception: pass - # Clear SDR device registry and waterfall source + # Clear SDR device registry with sdr_device_registry_lock: sdr_device_registry.clear() - waterfall_source = None return jsonify({'status': 'killed', 'processes': killed}) diff --git a/routes/listening_post.py b/routes/listening_post.py index 3121ff5..5775641 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -1725,16 +1725,6 @@ def start_waterfall() -> Response: """Start the waterfall/spectrogram display.""" global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device - # Check if a decoder is already producing FFT data via IQ pipeline - if app_module.waterfall_source in ('pager', 'sensor'): - # Decoder-driven waterfall: data is already flowing into waterfall_queue - waterfall_running = True - return jsonify({ - 'status': 'started', - 'source': 'decoder', - 'decoder': app_module.waterfall_source, - }) - with waterfall_lock: if waterfall_running: return jsonify({'status': 'error', 'message': 'Waterfall already running'}), 409 @@ -1789,13 +1779,6 @@ def start_waterfall() -> Response: @listening_post_bp.route('/waterfall/stop', methods=['POST']) def stop_waterfall() -> Response: """Stop the waterfall display.""" - global waterfall_running - - # If waterfall is decoder-driven, just disconnect (don't stop the decoder) - if app_module.waterfall_source in ('pager', 'sensor'): - waterfall_running = False - return jsonify({'status': 'stopped', 'source': 'decoder'}) - _stop_waterfall_internal() return jsonify({'status': 'stopped'}) diff --git a/routes/pager.py b/routes/pager.py index ce5a8ce..4ee5425 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -22,8 +22,8 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType, SDRValidationError from utils.dependencies import get_tool_path @@ -32,8 +32,6 @@ pager_bp = Blueprint('pager', __name__) # Track which device is being used pager_active_device: int | None = None -# IQ pipeline stop event (set to signal IQ processor thread to exit) -pager_iq_stop_event: threading.Event | None = None def parse_multimon_output(line: str) -> dict[str, str] | None: @@ -149,18 +147,12 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.output_queue.put({'type': 'error', 'text': str(e)}) finally: - global pager_active_device, pager_iq_stop_event + global pager_active_device try: os.close(master_fd) except OSError: pass - # Stop IQ pipeline if running - if pager_iq_stop_event is not None: - pager_iq_stop_event.set() - pager_iq_stop_event = None - if app_module.waterfall_source == 'pager': - app_module.waterfall_source = None - # Cleanup companion rtl_sdr/rtl_fm process and decoder + # Cleanup companion rtl_fm process and decoder with app_module.process_lock: rtl_proc = getattr(app_module.current_process, '_rtl_process', None) for proc in [rtl_proc, process]: @@ -183,28 +175,6 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: pager_active_device = None -def _cleanup_failed_start(rtl_process: subprocess.Popen | None) -> None: - """Clean up after a failed start attempt.""" - global pager_active_device, pager_iq_stop_event - if rtl_process: - try: - rtl_process.terminate() - rtl_process.wait(timeout=2) - except Exception: - try: - rtl_process.kill() - except Exception: - pass - if pager_iq_stop_event is not None: - pager_iq_stop_event.set() - pager_iq_stop_event = None - if app_module.waterfall_source == 'pager': - app_module.waterfall_source = None - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - - @pager_bp.route('/start', methods=['POST']) def start_decoding() -> Response: global pager_active_device @@ -302,196 +272,115 @@ def start_decoding() -> Response: builder = SDRFactory.get_builder(sdr_device.sdr_type) + # Build FM demodulation command + bias_t = data.get('bias_t', False) + rtl_cmd = builder.build_fm_demod_command( + device=sdr_device, + frequency_mhz=freq, + sample_rate=22050, + gain=float(gain) if gain and gain != '0' else None, + ppm=int(ppm) if ppm and ppm != '0' else None, + modulation='fm', + squelch=squelch if squelch and squelch != 0 else None, + bias_t=bias_t + ) + multimon_path = get_tool_path('multimon-ng') if not multimon_path: - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None return jsonify({'status': 'error', 'message': 'multimon-ng not found'}), 400 multimon_cmd = [multimon_path, '-t', 'raw'] + decoders + ['-f', 'alpha', '-'] - bias_t = data.get('bias_t', False) - gain_val = float(gain) if gain and gain != '0' else None - ppm_val = int(ppm) if ppm and ppm != '0' else None + full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) + logger.info(f"Running: {full_cmd}") - # Determine if we can use IQ pipeline for live waterfall - use_iq_pipeline = ( - sdr_type == SDRType.RTL_SDR - and not rtl_tcp_host - and get_tool_path('rtl_sdr') is not None - ) - - if use_iq_pipeline: - # IQ pipeline: rtl_sdr -> Python IQ processor -> multimon-ng - iq_sample_rate = 220500 # 22050 * 10 for exact decimation - rtl_cmd = builder.build_raw_capture_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=iq_sample_rate, - gain=gain_val, - ppm=ppm_val, - bias_t=bias_t, + try: + # Create pipe: rtl_fm | multimon-ng + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) + register_process(rtl_process) - full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(multimon_cmd) - logger.info(f"Running (IQ pipeline): {full_cmd}") + # Start a thread to monitor rtl_fm stderr for errors + def monitor_rtl_stderr(): + for line in rtl_process.stderr: + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + logger.debug(f"[RTL_FM] {err_text}") + app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'}) - try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(rtl_process) + rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr) + rtl_stderr_thread.daemon = True + rtl_stderr_thread.start() - # Monitor rtl_sdr stderr - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f"[rtl_sdr] {err_text}") - app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_sdr] {err_text}'}) + # Create a pseudo-terminal for multimon-ng output + master_fd, slave_fd = pty.openpty() - rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True) - rtl_stderr_thread.start() - - # Create PTY for multimon-ng output - master_fd, slave_fd = pty.openpty() - - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=subprocess.PIPE, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True, - ) - register_process(multimon_process) - - os.close(slave_fd) - - # Start IQ processor thread - from routes.listening_post import waterfall_queue - from utils.iq_processor import run_fm_iq_pipeline - - stop_event = threading.Event() - pager_iq_stop_event = stop_event - app_module.waterfall_source = 'pager' - - iq_thread = threading.Thread( - target=run_fm_iq_pipeline, - args=( - rtl_process.stdout, - multimon_process.stdin, - waterfall_queue, - freq, - iq_sample_rate, - stop_event, - ), - daemon=True, - ) - iq_thread.start() - - app_module.current_process = multimon_process - app_module.current_process._rtl_process = rtl_process - app_module.current_process._master_fd = master_fd - - # Start decoder output thread - thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True) - thread.start() - - app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'pager'}) - - except FileNotFoundError as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) - except Exception as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': str(e)}) - - else: - # Legacy pipeline: rtl_fm | multimon-ng - rtl_cmd = builder.build_fm_demod_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=22050, - gain=gain_val, - ppm=ppm_val, - modulation='fm', - squelch=squelch if squelch and squelch != 0 else None, - bias_t=bias_t, + multimon_process = subprocess.Popen( + multimon_cmd, + stdin=rtl_process.stdout, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True ) + register_process(multimon_process) - full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) - logger.info(f"Running: {full_cmd}") + os.close(slave_fd) + rtl_process.stdout.close() + app_module.current_process = multimon_process + app_module.current_process._rtl_process = rtl_process + app_module.current_process._master_fd = master_fd + + # Start output thread with PTY master fd + thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process)) + thread.daemon = True + thread.start() + + app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + + return jsonify({'status': 'started', 'command': full_cmd}) + + except FileNotFoundError as e: + # Kill orphaned rtl_fm process try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(rtl_process) - - # Start a thread to monitor rtl_fm stderr for errors - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f"[RTL_FM] {err_text}") - app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'}) - - rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True) - rtl_stderr_thread.start() - - # Create a pseudo-terminal for multimon-ng output - master_fd, slave_fd = pty.openpty() - - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=rtl_process.stdout, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True, - ) - register_process(multimon_process) - - os.close(slave_fd) - rtl_process.stdout.close() - - app_module.current_process = multimon_process - app_module.current_process._rtl_process = rtl_process - app_module.current_process._master_fd = master_fd - - # Start output thread with PTY master fd - thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True) - thread.start() - - app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd}) - - except FileNotFoundError as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) - except Exception as e: - _cleanup_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': str(e)}) + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass + # Release device on failure + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) + except Exception as e: + # Kill orphaned rtl_fm process if it was started + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass + # Release device on failure + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None + return jsonify({'status': 'error', 'message': str(e)}) @pager_bp.route('/stop', methods=['POST']) def stop_decoding() -> Response: - global pager_active_device, pager_iq_stop_event + global pager_active_device with app_module.process_lock: if app_module.current_process: - # Stop IQ pipeline if running - if pager_iq_stop_event is not None: - pager_iq_stop_event.set() - pager_iq_stop_event = None - if app_module.waterfall_source == 'pager': - app_module.waterfall_source = None - - # Kill rtl_sdr/rtl_fm process first + # Kill rtl_fm process first if hasattr(app_module.current_process, '_rtl_process'): try: app_module.current_process._rtl_process.terminate() @@ -580,14 +469,14 @@ def stream() -> Response: keepalive_interval = 30.0 # Send keepalive every 30 seconds instead of 1 second while True: - try: - msg = app_module.output_queue.get(timeout=1) - last_keepalive = time.time() - try: - process_event('pager', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + try: + msg = app_module.output_queue.get(timeout=1) + last_keepalive = time.time() + try: + process_event('pager', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= keepalive_interval: diff --git a/routes/sensor.py b/routes/sensor.py index c78e095..e5a719e 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -18,20 +18,15 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import format_sse -from utils.event_pipeline import process_event +from utils.sse import format_sse +from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType -from utils.dependencies import get_tool_path sensor_bp = Blueprint('sensor', __name__) # Track which device is being used sensor_active_device: int | None = None -# IQ pipeline stop event -sensor_iq_stop_event: threading.Event | None = None -# Companion rtl_sdr process when using IQ pipeline -sensor_rtl_process: subprocess.Popen | None = None def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: @@ -65,26 +60,8 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) finally: - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process - # Stop IQ pipeline if running - if sensor_iq_stop_event is not None: - sensor_iq_stop_event.set() - sensor_iq_stop_event = None - if app_module.waterfall_source == 'sensor': - app_module.waterfall_source = None - # Terminate companion rtl_sdr process - if sensor_rtl_process is not None: - try: - sensor_rtl_process.terminate() - sensor_rtl_process.wait(timeout=2) - except Exception: - try: - sensor_rtl_process.kill() - except Exception: - pass - unregister_process(sensor_rtl_process) - sensor_rtl_process = None - # Ensure decoder process is terminated + global sensor_active_device + # Ensure process is terminated try: process.terminate() process.wait(timeout=2) @@ -103,32 +80,9 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: sensor_active_device = None -def _cleanup_sensor_failed_start(rtl_process: subprocess.Popen | None) -> None: - """Clean up after a failed sensor start attempt.""" - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process - if rtl_process: - try: - rtl_process.terminate() - rtl_process.wait(timeout=2) - except Exception: - try: - rtl_process.kill() - except Exception: - pass - if sensor_iq_stop_event is not None: - sensor_iq_stop_event.set() - sensor_iq_stop_event = None - if app_module.waterfall_source == 'sensor': - app_module.waterfall_source = None - sensor_rtl_process = None - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - - @sensor_bp.route('/start_sensor', methods=['POST']) def start_sensor() -> Response: - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process + global sensor_active_device with app_module.sensor_lock: if app_module.sensor_process: @@ -190,187 +144,69 @@ def start_sensor() -> Response: sdr_device = SDRFactory.create_default_device(sdr_type, index=device) builder = SDRFactory.get_builder(sdr_device.sdr_type) - bias_t = data.get('bias_t', False) - gain_val = float(gain) if gain and gain != 0 else None - ppm_val = int(ppm) if ppm and ppm != 0 else None - # Determine if we can use IQ pipeline for live waterfall - use_iq_pipeline = ( - sdr_type == SDRType.RTL_SDR - and not rtl_tcp_host - and get_tool_path('rtl_sdr') is not None + # Build ISM band decoder command + bias_t = data.get('bias_t', False) + cmd = builder.build_ism_command( + device=sdr_device, + frequency_mhz=freq, + gain=float(gain) if gain and gain != 0 else None, + ppm=int(ppm) if ppm and ppm != 0 else None, + bias_t=bias_t ) - if use_iq_pipeline: - # IQ pipeline: rtl_sdr -> Python IQ tee -> rtl_433 -r - - iq_sample_rate = 250000 # rtl_433 default + full_cmd = ' '.join(cmd) + logger.info(f"Running: {full_cmd}") - rtl_cmd = builder.build_raw_capture_command( - device=sdr_device, - frequency_mhz=freq, - sample_rate=iq_sample_rate, - gain=gain_val, - ppm=ppm_val, - bias_t=bias_t, + try: + app_module.sensor_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) + register_process(app_module.sensor_process) - rtl_433_path = get_tool_path('rtl_433') or 'rtl_433' - decoder_cmd = [rtl_433_path, '-r', '-', '-s', str(iq_sample_rate), '-F', 'json'] + # Start output thread + thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,)) + thread.daemon = True + thread.start() - full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(decoder_cmd) - logger.info(f"Running (IQ pipeline): {full_cmd}") + # Monitor stderr + def monitor_stderr(): + for line in app_module.sensor_process.stderr: + err = line.decode('utf-8', errors='replace').strip() + if err: + logger.debug(f"[rtl_433] {err}") + app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(rtl_process) - sensor_rtl_process = rtl_process + stderr_thread = threading.Thread(target=monitor_stderr) + stderr_thread.daemon = True + stderr_thread.start() - # Monitor rtl_sdr stderr - def monitor_rtl_stderr(): - for line in rtl_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_sdr] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_sdr] {err}'}) + app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - threading.Thread(target=monitor_rtl_stderr, daemon=True).start() + return jsonify({'status': 'started', 'command': full_cmd}) - # Start rtl_433 reading from stdin - decoder_process = subprocess.Popen( - decoder_cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(decoder_process) - - # Start IQ processor thread - from routes.listening_post import waterfall_queue - from utils.iq_processor import run_passthrough_iq_pipeline - - stop_event = threading.Event() - sensor_iq_stop_event = stop_event - app_module.waterfall_source = 'sensor' - - iq_thread = threading.Thread( - target=run_passthrough_iq_pipeline, - args=( - rtl_process.stdout, - decoder_process.stdin, - waterfall_queue, - freq, - iq_sample_rate, - stop_event, - ), - daemon=True, - ) - iq_thread.start() - - app_module.sensor_process = decoder_process - - # Monitor rtl_433 stderr - def monitor_decoder_stderr(): - for line in decoder_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_433] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - - threading.Thread(target=monitor_decoder_stderr, daemon=True).start() - - # Start output thread - thread = threading.Thread(target=stream_sensor_output, args=(decoder_process,), daemon=True) - thread.start() - - app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'sensor'}) - - except FileNotFoundError: - _cleanup_sensor_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': 'rtl_sdr or rtl_433 not found'}) - except Exception as e: - _cleanup_sensor_failed_start(rtl_process) - return jsonify({'status': 'error', 'message': str(e)}) - - else: - # Legacy pipeline: rtl_433 directly - cmd = builder.build_ism_command( - device=sdr_device, - frequency_mhz=freq, - gain=gain_val, - ppm=ppm_val, - bias_t=bias_t, - ) - - full_cmd = ' '.join(cmd) - logger.info(f"Running: {full_cmd}") - - try: - app_module.sensor_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - register_process(app_module.sensor_process) - - # Start output thread - thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,), daemon=True) - thread.start() - - # Monitor stderr - def monitor_stderr(): - for line in app_module.sensor_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err: - logger.debug(f"[rtl_433] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - - threading.Thread(target=monitor_stderr, daemon=True).start() - - app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - return jsonify({'status': 'started', 'command': full_cmd}) - - except FileNotFoundError: - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) - except Exception as e: - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) + except FileNotFoundError: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) + except Exception as e: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None + return jsonify({'status': 'error', 'message': str(e)}) @sensor_bp.route('/stop_sensor', methods=['POST']) def stop_sensor() -> Response: - global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process + global sensor_active_device with app_module.sensor_lock: if app_module.sensor_process: - # Stop IQ pipeline if running - if sensor_iq_stop_event is not None: - sensor_iq_stop_event.set() - sensor_iq_stop_event = None - if app_module.waterfall_source == 'sensor': - app_module.waterfall_source = None - - # Kill companion rtl_sdr process - if sensor_rtl_process is not None: - try: - sensor_rtl_process.terminate() - sensor_rtl_process.wait(timeout=2) - except (subprocess.TimeoutExpired, OSError): - try: - sensor_rtl_process.kill() - except OSError: - pass - sensor_rtl_process = None - app_module.sensor_process.terminate() try: app_module.sensor_process.wait(timeout=2) @@ -396,13 +232,13 @@ def stream_sensor() -> Response: while True: try: - msg = app_module.sensor_queue.get(timeout=1) - last_keepalive = time.time() - try: - process_event('sensor', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) + msg = app_module.sensor_queue.get(timeout=1) + last_keepalive = time.time() + try: + process_event('sensor', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) except queue.Empty: now = time.time() if now - last_keepalive >= keepalive_interval: diff --git a/static/js/modes/dmr.js b/static/js/modes/dmr.js index 5e1675d..447afaa 100644 --- a/static/js/modes/dmr.js +++ b/static/js/modes/dmr.js @@ -88,9 +88,6 @@ function startDmr() { if (typeof reserveDevice === 'function') { reserveDevice(parseInt(device), 'dmr'); } - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true }); - } if (typeof showNotification === 'function') { showNotification('DMR', `Decoding ${frequency} MHz (${protocol.toUpperCase()})`); } diff --git a/static/js/modes/listening-post.js b/static/js/modes/listening-post.js index b1b2b54..78c5010 100644 --- a/static/js/modes/listening-post.js +++ b/static/js/modes/listening-post.js @@ -3589,8 +3589,7 @@ async function startWaterfall(options = {}) { lastWaterfallDraw = 0; initWaterfallCanvas(); connectWaterfallSSE(); - // Only reserve device if not decoder-driven (decoder already owns the device) - if (data.source !== 'decoder' && typeof reserveDevice === 'function') { + if (typeof reserveDevice === 'function') { reserveDevice(parseInt(device), 'waterfall'); } if (resume || resumeRfWaterfallAfterListening) { @@ -3619,14 +3618,11 @@ async function stopWaterfall() { } try { - const resp = await fetch('/listening/waterfall/stop', { method: 'POST' }); - let stopData = {}; - try { stopData = await resp.json(); } catch (e) {} + await fetch('/listening/waterfall/stop', { method: 'POST' }); isWaterfallRunning = false; if (waterfallEventSource) { waterfallEventSource.close(); waterfallEventSource = null; } setWaterfallControlButtons(false); - // Only release device if it was a standalone waterfall (not decoder-driven) - if (stopData.source !== 'decoder' && typeof releaseDevice === 'function') { + if (typeof releaseDevice === 'function') { releaseDevice('waterfall'); } } catch (err) { diff --git a/static/js/modes/sstv-general.js b/static/js/modes/sstv-general.js index 1029334..0b89efe 100644 --- a/static/js/modes/sstv-general.js +++ b/static/js/modes/sstv-general.js @@ -98,9 +98,6 @@ const SSTVGeneral = (function() { updateStatusUI('listening', `${frequency} MHz ${modulation.toUpperCase()}`); startStream(); showNotification('SSTV', `Listening on ${frequency} MHz ${modulation.toUpperCase()}`); - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true }); - } // Update strip const stripFreq = document.getElementById('sstvGeneralStripFreq'); diff --git a/static/js/modes/sstv.js b/static/js/modes/sstv.js index ea9d9e6..6bafdb0 100644 --- a/static/js/modes/sstv.js +++ b/static/js/modes/sstv.js @@ -537,18 +537,15 @@ const SSTV = (function() { const data = await response.json(); - if (data.status === 'started' || data.status === 'already_running') { - isRunning = true; - if (typeof reserveDevice === 'function') { - reserveDevice(device, 'sstv'); - } - updateStatusUI('listening', `${frequency} MHz`); - startStream(); - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true }); - } - showNotification('SSTV', `Listening on ${frequency} MHz`); - } else { + if (data.status === 'started' || data.status === 'already_running') { + isRunning = true; + if (typeof reserveDevice === 'function') { + reserveDevice(device, 'sstv'); + } + updateStatusUI('listening', `${frequency} MHz`); + startStream(); + showNotification('SSTV', `Listening on ${frequency} MHz`); + } else { updateStatusUI('idle', 'Start failed'); showStatusMessage(data.message || 'Failed to start decoder', 'error'); } diff --git a/templates/index.html b/templates/index.html index 7982c6f..25131ed 100644 --- a/templates/index.html +++ b/templates/index.html @@ -2859,23 +2859,6 @@ } } - function getModeWaterfallFrequency(mode) { - const lookup = { - pager: 'frequency', - sensor: 'sensorFrequency', - rtlamr: 'rtlamrFrequency', - dmr: 'dmrFrequency', - sstv: 'sstvFrequency', - sstv_general: 'sstvGeneralFrequency', - listening: 'radioScanStart' - }; - const id = lookup[mode]; - if (!id) return NaN; - const el = document.getElementById(id); - const value = parseFloat(el?.value); - return Number.isFinite(value) ? value : NaN; - } - // Mode switching function switchMode(mode, options = {}) { const { updateUrl = true } = options; @@ -3079,19 +3062,13 @@ // Show shared waterfall controls for supported modes const waterfallControlsSection = document.getElementById('waterfallControlsSection'); const waterfallPanel = document.getElementById('waterfallPanel'); - const waterfallModes = ['pager', 'sensor', 'rtlamr', 'dmr', 'sstv', 'sstv_general', 'listening']; + const waterfallModes = ['listening']; const waterfallSupported = waterfallModes.includes(mode); if (waterfallControlsSection) waterfallControlsSection.style.display = waterfallSupported ? 'block' : 'none'; if (waterfallPanel) { const running = (typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning); waterfallPanel.style.display = (waterfallSupported && running) ? 'block' : 'none'; } - if (waterfallSupported && typeof syncWaterfallToFrequency === 'function' && typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning) { - const modeFreq = getModeWaterfallFrequency(mode); - if (Number.isFinite(modeFreq)) { - syncWaterfallToFrequency(modeFreq, { autoStart: true, restartIfRunning: true, silent: true }); - } - } // Toggle mode-specific tool status displays const toolStatusPager = document.getElementById('toolStatusPager'); @@ -3187,9 +3164,6 @@ // Sensor frequency function setSensorFreq(freq) { document.getElementById('sensorFrequency').value = freq; - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning }); - } if (isSensorRunning) { fetch('/stop_sensor', { method: 'POST' }) .then(() => setTimeout(() => startSensorDecoding(), 500)); @@ -3274,9 +3248,6 @@ reserveDevice(parseInt(device), 'sensor'); setSensorRunning(true); startSensorStream(); - if (!remoteConfig && typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true }); - } // Initialize sensor filter bar const filterContainer = document.getElementById('filterBarContainer'); @@ -3530,9 +3501,6 @@ function setRtlamrFreq(freq) { document.getElementById('rtlamrFrequency').value = freq; - if (typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning }); - } } // RTLAMR mode polling timer for agent mode @@ -3588,9 +3556,6 @@ } setRtlamrRunning(true); startRtlamrStream(isAgentMode); - if (!isAgentMode && typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true }); - } // Initialize meter filter bar (reuse sensor filter bar since same structure) const filterContainer = document.getElementById('filterBarContainer'); @@ -4362,9 +4327,6 @@ } setRunning(true); startStream(isAgentMode); - if (!isAgentMode && !remoteConfig && typeof syncWaterfallToFrequency === 'function') { - syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true }); - } // Initialize filter bar const filterContainer = document.getElementById('filterBarContainer'); diff --git a/utils/dependencies.py b/utils/dependencies.py index 1fb57a5..256b995 100644 --- a/utils/dependencies.py +++ b/utils/dependencies.py @@ -121,15 +121,6 @@ TOOL_DEPENDENCIES = { 'manual': 'https://github.com/EliasOenal/multimon-ng' } }, - 'rtl_sdr': { - 'required': False, - 'description': 'Raw IQ capture for live waterfall during decoding', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } - }, 'rtl_test': { 'required': False, 'description': 'RTL-SDR device detection', @@ -152,15 +143,6 @@ TOOL_DEPENDENCIES = { 'brew': 'brew install rtl_433', 'manual': 'https://github.com/merbanan/rtl_433' } - }, - 'rtl_sdr': { - 'required': False, - 'description': 'Raw IQ capture for live waterfall during decoding', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } } } }, diff --git a/utils/iq_processor.py b/utils/iq_processor.py deleted file mode 100644 index a6d1bfa..0000000 --- a/utils/iq_processor.py +++ /dev/null @@ -1,230 +0,0 @@ -"""IQ processing pipelines for live waterfall during SDR decoding. - -Provides two pipeline functions: -- run_fm_iq_pipeline: FM demodulates IQ for pager decoding + FFT for waterfall -- run_passthrough_iq_pipeline: Passes raw IQ to rtl_433 + FFT for waterfall -""" - -from __future__ import annotations - -import logging -import struct -import threading -import queue -from datetime import datetime -from typing import IO, Optional - -import numpy as np - -logger = logging.getLogger('intercept.iq_processor') - -# FFT parameters -FFT_SIZE = 2048 -FFT_INTERVAL_SECONDS = 0.1 # ~10 updates/sec - - -def iq_to_complex(buf: bytes) -> np.ndarray: - """Convert raw uint8 IQ bytes to complex float samples. - - RTL-SDR outputs interleaved uint8 I/Q pairs centered at 127.5. - """ - raw = np.frombuffer(buf, dtype=np.uint8).astype(np.float32) - raw = (raw - 127.5) / 127.5 - return raw[0::2] + 1j * raw[1::2] - - -def compute_fft_bins(samples: np.ndarray, fft_size: int = FFT_SIZE) -> list[float]: - """Compute power spectral density in dB from complex IQ samples. - - Returns a list of power values (dB) for each frequency bin. - """ - if len(samples) < fft_size: - # Pad with zeros if not enough samples - padded = np.zeros(fft_size, dtype=np.complex64) - padded[:len(samples)] = samples[:fft_size] - samples = padded - else: - samples = samples[:fft_size] - - # Apply Hanning window to reduce spectral leakage - window = np.hanning(fft_size).astype(np.float32) - windowed = samples * window - - # FFT and shift DC to center - spectrum = np.fft.fftshift(np.fft.fft(windowed)) - - # Power in dB (avoid log of zero) - power = np.abs(spectrum) ** 2 - power = np.maximum(power, 1e-20) - power_db = 10.0 * np.log10(power) - - return power_db.tolist() - - -def _push_waterfall(waterfall_queue: queue.Queue, bins: list[float], - center_freq_mhz: float, sample_rate: int) -> None: - """Push a waterfall sweep message to the queue.""" - half_span = (sample_rate / 1e6) / 2.0 - msg = { - 'type': 'waterfall_sweep', - 'start_freq': center_freq_mhz - half_span, - 'end_freq': center_freq_mhz + half_span, - 'bins': bins, - 'timestamp': datetime.now().isoformat(), - } - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - # Drop oldest and retry - try: - waterfall_queue.get_nowait() - except queue.Empty: - pass - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - pass - - -def run_fm_iq_pipeline( - iq_stdout: IO[bytes], - audio_stdin: IO[bytes], - waterfall_queue: queue.Queue, - center_freq_mhz: float, - sample_rate: int, - stop_event: threading.Event, -) -> None: - """FM demodulation pipeline: IQ -> FFT + FM demod -> 22050 Hz PCM. - - Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall, - FM demodulates, decimates to 22050 Hz, and writes 16-bit PCM to - multimon-ng stdin. - - Args: - iq_stdout: rtl_sdr stdout (raw uint8 IQ) - audio_stdin: multimon-ng stdin (16-bit PCM) - waterfall_queue: Queue for waterfall sweep messages - center_freq_mhz: Center frequency in MHz - sample_rate: IQ sample rate (should be 220500 for 10x decimation to 22050) - stop_event: Threading event to signal shutdown - """ - from scipy.signal import decimate as scipy_decimate - - # Decimation factor: sample_rate / 22050 - decim_factor = sample_rate // 22050 - if decim_factor < 1: - decim_factor = 1 - - # Read in chunks: ~100ms worth of IQ data (2 bytes per sample: I + Q) - chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2 - # Align to even number of bytes (I/Q pairs) - chunk_bytes = (chunk_bytes // 2) * 2 - - # Previous sample for FM demod continuity - prev_sample = np.complex64(0) - - logger.info(f"FM IQ pipeline started: {center_freq_mhz} MHz, " - f"sr={sample_rate}, decim={decim_factor}") - - try: - while not stop_event.is_set(): - raw = iq_stdout.read(chunk_bytes) - if not raw: - break - - # Convert to complex IQ - iq = iq_to_complex(raw) - if len(iq) == 0: - continue - - # Compute FFT for waterfall - bins = compute_fft_bins(iq, FFT_SIZE) - _push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate) - - # FM demodulation via instantaneous phase difference - # Prepend previous sample for continuity - iq_with_prev = np.concatenate(([prev_sample], iq)) - prev_sample = iq[-1] - - phase_diff = np.angle(iq_with_prev[1:] * np.conj(iq_with_prev[:-1])) - - # Decimate to 22050 Hz - if decim_factor > 1: - audio = scipy_decimate(phase_diff, decim_factor, ftype='fir') - else: - audio = phase_diff - - # Scale to 16-bit PCM range - audio = np.clip(audio * 10000, -32767, 32767).astype(np.int16) - - # Write to multimon-ng - try: - audio_stdin.write(audio.tobytes()) - audio_stdin.flush() - except (BrokenPipeError, OSError): - break - - except Exception as e: - logger.error(f"FM IQ pipeline error: {e}") - finally: - logger.info("FM IQ pipeline stopped") - try: - audio_stdin.close() - except Exception: - pass - - -def run_passthrough_iq_pipeline( - iq_stdout: IO[bytes], - decoder_stdin: IO[bytes], - waterfall_queue: queue.Queue, - center_freq_mhz: float, - sample_rate: int, - stop_event: threading.Event, -) -> None: - """Passthrough pipeline: IQ -> FFT + raw bytes to decoder. - - Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall, - and writes raw IQ bytes unchanged to rtl_433 stdin. - - Args: - iq_stdout: rtl_sdr stdout (raw uint8 IQ) - decoder_stdin: rtl_433 stdin (raw cu8 IQ) - waterfall_queue: Queue for waterfall sweep messages - center_freq_mhz: Center frequency in MHz - sample_rate: IQ sample rate (should be 250000 for rtl_433) - stop_event: Threading event to signal shutdown - """ - # Read in chunks: ~100ms worth of IQ data - chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2 - chunk_bytes = (chunk_bytes // 2) * 2 - - logger.info(f"Passthrough IQ pipeline started: {center_freq_mhz} MHz, sr={sample_rate}") - - try: - while not stop_event.is_set(): - raw = iq_stdout.read(chunk_bytes) - if not raw: - break - - # Compute FFT for waterfall - iq = iq_to_complex(raw) - if len(iq) > 0: - bins = compute_fft_bins(iq, FFT_SIZE) - _push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate) - - # Pass raw bytes unchanged to decoder - try: - decoder_stdin.write(raw) - decoder_stdin.flush() - except (BrokenPipeError, OSError): - break - - except Exception as e: - logger.error(f"Passthrough IQ pipeline error: {e}") - finally: - logger.info("Passthrough IQ pipeline stopped") - try: - decoder_stdin.close() - except Exception: - pass diff --git a/utils/sdr/base.py b/utils/sdr/base.py index 411d923..4dc79be 100644 --- a/utils/sdr/base.py +++ b/utils/sdr/base.py @@ -186,36 +186,6 @@ class CommandBuilder(ABC): """Return hardware capabilities for this SDR type.""" pass - def build_raw_capture_command( - self, - device: SDRDevice, - frequency_mhz: float, - sample_rate: int, - gain: Optional[float] = None, - ppm: Optional[int] = None, - bias_t: bool = False - ) -> list[str]: - """ - Build raw IQ capture command (for IQ-based waterfall during decoding). - - Args: - device: The SDR device to use - frequency_mhz: Center frequency in MHz - sample_rate: Sample rate in Hz - gain: Gain in dB (None for auto) - ppm: PPM frequency correction - bias_t: Enable bias-T power (for active antennas) - - Returns: - Command as list of strings for subprocess - - Raises: - NotImplementedError: If the SDR type does not support raw capture - """ - raise NotImplementedError( - f"Raw IQ capture not supported for {self.get_sdr_type().value}" - ) - @classmethod @abstractmethod def get_sdr_type(cls) -> SDRType: diff --git a/utils/sdr/rtlsdr.py b/utils/sdr/rtlsdr.py index 5de2551..6d2b8d8 100644 --- a/utils/sdr/rtlsdr.py +++ b/utils/sdr/rtlsdr.py @@ -197,43 +197,6 @@ class RTLSDRCommandBuilder(CommandBuilder): return cmd - def build_raw_capture_command( - self, - device: SDRDevice, - frequency_mhz: float, - sample_rate: int, - gain: Optional[float] = None, - ppm: Optional[int] = None, - bias_t: bool = False - ) -> list[str]: - """ - Build rtl_sdr command for raw IQ capture. - - Outputs raw uint8 IQ to stdout for processing by IQ pipelines. - """ - rtl_sdr_path = get_tool_path('rtl_sdr') or 'rtl_sdr' - freq_hz = int(frequency_mhz * 1e6) - cmd = [ - rtl_sdr_path, - '-d', self._get_device_arg(device), - '-f', str(freq_hz), - '-s', str(sample_rate), - ] - - if gain is not None and gain > 0: - cmd.extend(['-g', str(gain)]) - - if ppm is not None and ppm != 0: - cmd.extend(['-p', str(ppm)]) - - if bias_t: - cmd.extend(['-T']) - - # Output to stdout - cmd.append('-') - - return cmd - def build_ais_command( self, device: SDRDevice,