diff --git a/CHANGELOG.md b/CHANGELOG.md index c018d85..bcfdd59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ All notable changes to iNTERCEPT will be documented in this file. ### Fixed - Waterfall control panel rendered as unstyled text for up to 20 seconds on first visit — CSS is now loaded eagerly with the rest of the page assets - WebSDR globe failed to render on first page load — initialization now waits for a layout frame before mounting the WebGL renderer, ensuring the container has non-zero dimensions +- Waterfall monitor audio took minutes to start — `_waitForPlayback` now only reports success on actual audio playback (`playing`/`timeupdate`), not from the WAV header alone (`loadeddata`/`canplay`) +- Waterfall monitor could not be stopped — `stopMonitor()` now pauses audio and updates the UI immediately instead of waiting for the backend stop request (which blocked for 1+ seconds during SDR process cleanup) --- diff --git a/config.py b/config.py index f2ac362..4d338b3 100644 --- a/config.py +++ b/config.py @@ -17,6 +17,8 @@ CHANGELOG = [ "highlights": [ "Waterfall control panel no longer shows as unstyled text on first visit", "WebSDR globe renders correctly on first page load without requiring a refresh", + "Waterfall monitor audio no longer takes minutes to start — playback detection now waits for real audio data instead of just the WAV header", + "Waterfall monitor stop is now instant — audio pauses and UI updates immediately instead of waiting for backend cleanup", ] }, { diff --git a/routes/listening_post.py b/routes/listening_post.py index 386303d..183ccc7 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -5,22 +5,22 @@ from __future__ import annotations import json import math import os -import queue -import select -import signal -import shutil -import struct -import subprocess -import threading -import time +import queue +import select +import signal +import shutil +import struct +import subprocess +import threading +import time from datetime import datetime -from typing import Any, Dict, Generator, List, Optional +from typing import Any, Dict, Generator, List, Optional from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import get_logger -from utils.sse import sse_stream_fanout +from utils.sse import sse_stream_fanout from utils.event_pipeline import process_event from utils.constants import ( SSE_QUEUE_TIMEOUT, @@ -29,9 +29,9 @@ from utils.constants import ( ) from utils.sdr import SDRFactory, SDRType -logger = get_logger('intercept.receiver') - -receiver_bp = Blueprint('receiver', __name__, url_prefix='/receiver') +logger = get_logger('intercept.receiver') + +receiver_bp = Blueprint('receiver', __name__, url_prefix='/receiver') # ============================================ # GLOBAL STATE @@ -41,10 +41,10 @@ receiver_bp = Blueprint('receiver', __name__, url_prefix='/receiver') audio_process = None audio_rtl_process = None audio_lock = threading.Lock() -audio_running = False -audio_frequency = 0.0 -audio_modulation = 'fm' -audio_source = 'process' +audio_running = False +audio_frequency = 0.0 +audio_modulation = 'fm' +audio_source = 'process' # Scanner state scanner_thread: Optional[threading.Thread] = None @@ -104,37 +104,37 @@ def find_ffmpeg() -> str | None: return shutil.which('ffmpeg') -VALID_MODULATIONS = ['fm', 'wfm', 'am', 'usb', 'lsb'] - - -def normalize_modulation(value: str) -> str: - """Normalize and validate modulation string.""" - mod = str(value or '').lower().strip() - if mod not in VALID_MODULATIONS: - raise ValueError(f'Invalid modulation. Use: {", ".join(VALID_MODULATIONS)}') - return mod - - -def _rtl_fm_demod_mode(modulation: str) -> str: - """Map UI modulation names to rtl_fm demod tokens.""" - mod = str(modulation or '').lower().strip() - return 'wbfm' if mod == 'wfm' else mod - - -def _wav_header(sample_rate: int = 48000, bits_per_sample: int = 16, channels: int = 1) -> bytes: - """Create a streaming WAV header with unknown data length.""" - bytes_per_sample = bits_per_sample // 8 - byte_rate = sample_rate * channels * bytes_per_sample - block_align = channels * bytes_per_sample - return ( - b'RIFF' - + struct.pack(' str: + """Normalize and validate modulation string.""" + mod = str(value or '').lower().strip() + if mod not in VALID_MODULATIONS: + raise ValueError(f'Invalid modulation. Use: {", ".join(VALID_MODULATIONS)}') + return mod + + +def _rtl_fm_demod_mode(modulation: str) -> str: + """Map UI modulation names to rtl_fm demod tokens.""" + mod = str(modulation or '').lower().strip() + return 'wbfm' if mod == 'wfm' else mod + + +def _wav_header(sample_rate: int = 48000, bits_per_sample: int = 16, channels: int = 1) -> bytes: + """Create a streaming WAV header with unknown data length.""" + bytes_per_sample = bits_per_sample // 8 + byte_rate = sample_rate * channels * bytes_per_sample + block_align = channels * bytes_per_sample + return ( + b'RIFF' + + struct.pack(' Response: }) -@receiver_bp.route('/scanner/stream') -def stream_scanner_events() -> Response: - """SSE stream for scanner events.""" - def _on_msg(msg: dict[str, Any]) -> None: - process_event('receiver_scanner', msg, msg.get('type')) - - response = Response( - sse_stream_fanout( - source_queue=scanner_queue, - channel_key='receiver_scanner', - timeout=SSE_QUEUE_TIMEOUT, - keepalive_interval=SSE_KEEPALIVE_INTERVAL, - on_message=_on_msg, - ), - mimetype='text/event-stream', - ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - return response +@receiver_bp.route('/scanner/stream') +def stream_scanner_events() -> Response: + """SSE stream for scanner events.""" + def _on_msg(msg: dict[str, Any]) -> None: + process_event('receiver_scanner', msg, msg.get('type')) + + response = Response( + sse_stream_fanout( + source_queue=scanner_queue, + channel_key='receiver_scanner', + timeout=SSE_QUEUE_TIMEOUT, + keepalive_interval=SSE_KEEPALIVE_INTERVAL, + on_message=_on_msg, + ), + mimetype='text/event-stream', + ) + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + return response @receiver_bp.route('/scanner/log') @@ -1268,10 +1270,10 @@ def get_presets() -> Response: # ============================================ @receiver_bp.route('/audio/start', methods=['POST']) -def start_audio() -> Response: - """Start audio at specific frequency (manual mode).""" - global scanner_running, scanner_active_device, receiver_active_device, scanner_power_process, scanner_thread - global audio_running, audio_frequency, audio_modulation, audio_source +def start_audio() -> Response: + """Start audio at specific frequency (manual mode).""" + global scanner_running, scanner_active_device, receiver_active_device, scanner_power_process, scanner_thread + global audio_running, audio_frequency, audio_modulation, audio_source # Stop scanner if running if scanner_running: @@ -1302,23 +1304,23 @@ def start_audio() -> Response: data = request.json or {} - try: - frequency = float(data.get('frequency', 0)) - modulation = normalize_modulation(data.get('modulation', 'wfm')) - squelch = int(data.get('squelch', 0)) - gain = int(data.get('gain', 40)) - device = int(data.get('device', 0)) - sdr_type = str(data.get('sdr_type', 'rtlsdr')).lower() - bias_t_raw = data.get('bias_t', scanner_config.get('bias_t', False)) - if isinstance(bias_t_raw, str): - bias_t = bias_t_raw.strip().lower() in {'1', 'true', 'yes', 'on'} - else: - bias_t = bool(bias_t_raw) - except (ValueError, TypeError) as e: - return jsonify({ - 'status': 'error', - 'message': f'Invalid parameter: {e}' - }), 400 + try: + frequency = float(data.get('frequency', 0)) + modulation = normalize_modulation(data.get('modulation', 'wfm')) + squelch = int(data.get('squelch', 0)) + gain = int(data.get('gain', 40)) + device = int(data.get('device', 0)) + sdr_type = str(data.get('sdr_type', 'rtlsdr')).lower() + bias_t_raw = data.get('bias_t', scanner_config.get('bias_t', False)) + if isinstance(bias_t_raw, str): + bias_t = bias_t_raw.strip().lower() in {'1', 'true', 'yes', 'on'} + else: + bias_t = bool(bias_t_raw) + except (ValueError, TypeError) as e: + return jsonify({ + 'status': 'error', + 'message': f'Invalid parameter: {e}' + }), 400 if frequency <= 0: return jsonify({ @@ -1335,51 +1337,51 @@ def start_audio() -> Response: # Update config for audio scanner_config['squelch'] = squelch - scanner_config['gain'] = gain - scanner_config['device'] = device - scanner_config['sdr_type'] = sdr_type - scanner_config['bias_t'] = bias_t - - # Preferred path: when waterfall WebSocket is active on the same SDR, - # derive monitor audio from that IQ stream instead of spawning rtl_fm. - try: - from routes.waterfall_websocket import ( - get_shared_capture_status, - start_shared_monitor_from_capture, - ) - - shared = get_shared_capture_status() - if shared.get('running') and shared.get('device') == device: - _stop_audio_stream() - ok, msg = start_shared_monitor_from_capture( - device=device, - frequency_mhz=frequency, - modulation=modulation, - squelch=squelch, - ) - if ok: - audio_running = True - audio_frequency = frequency - audio_modulation = modulation - audio_source = 'waterfall' - # Shared monitor uses the waterfall's existing SDR claim. - if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) - receiver_active_device = None - return jsonify({ - 'status': 'started', - 'frequency': frequency, - 'modulation': modulation, - 'source': 'waterfall', - }) - logger.warning(f"Shared waterfall monitor unavailable: {msg}") - except Exception as e: - logger.debug(f"Shared waterfall monitor probe failed: {e}") - - # Stop waterfall if it's using the same SDR (SSE path) - if waterfall_running and waterfall_active_device == device: - _stop_waterfall_internal() - time.sleep(0.2) + scanner_config['gain'] = gain + scanner_config['device'] = device + scanner_config['sdr_type'] = sdr_type + scanner_config['bias_t'] = bias_t + + # Preferred path: when waterfall WebSocket is active on the same SDR, + # derive monitor audio from that IQ stream instead of spawning rtl_fm. + try: + from routes.waterfall_websocket import ( + get_shared_capture_status, + start_shared_monitor_from_capture, + ) + + shared = get_shared_capture_status() + if shared.get('running') and shared.get('device') == device: + _stop_audio_stream() + ok, msg = start_shared_monitor_from_capture( + device=device, + frequency_mhz=frequency, + modulation=modulation, + squelch=squelch, + ) + if ok: + audio_running = True + audio_frequency = frequency + audio_modulation = modulation + audio_source = 'waterfall' + # Shared monitor uses the waterfall's existing SDR claim. + if receiver_active_device is not None: + app_module.release_sdr_device(receiver_active_device) + receiver_active_device = None + return jsonify({ + 'status': 'started', + 'frequency': frequency, + 'modulation': modulation, + 'source': 'waterfall', + }) + logger.warning(f"Shared waterfall monitor unavailable: {msg}") + except Exception as e: + logger.debug(f"Shared waterfall monitor probe failed: {e}") + + # Stop waterfall if it's using the same SDR (SSE path) + if waterfall_running and waterfall_active_device == device: + _stop_waterfall_internal() + time.sleep(0.2) # Claim device for listening audio. The WebSocket waterfall handler # may still be tearing down its IQ capture process (thread join + @@ -1390,15 +1392,15 @@ def start_audio() -> Response: app_module.release_sdr_device(receiver_active_device) receiver_active_device = None - error = None - max_claim_attempts = 6 - for attempt in range(max_claim_attempts): - error = app_module.claim_sdr_device(device, 'receiver') - if not error: - break - if attempt < max_claim_attempts - 1: - logger.debug( - f"Device claim attempt {attempt + 1}/{max_claim_attempts} " + error = None + max_claim_attempts = 6 + for attempt in range(max_claim_attempts): + error = app_module.claim_sdr_device(device, 'receiver') + if not error: + break + if attempt < max_claim_attempts - 1: + logger.debug( + f"Device claim attempt {attempt + 1}/{max_claim_attempts} " f"failed, retrying in 0.5s: {error}" ) time.sleep(0.5) @@ -1411,40 +1413,40 @@ def start_audio() -> Response: }), 409 receiver_active_device = device - _start_audio_stream(frequency, modulation) - - if audio_running: - audio_source = 'process' - return jsonify({ - 'status': 'started', - 'frequency': frequency, - 'modulation': modulation, - 'source': 'process', - }) - else: - # Avoid leaving a stale device claim after startup failure. - if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) - receiver_active_device = None - - start_error = '' - for log_path in ('/tmp/rtl_fm_stderr.log', '/tmp/ffmpeg_stderr.log'): - try: - with open(log_path, 'r') as handle: - content = handle.read().strip() - if content: - start_error = content.splitlines()[-1] - break - except Exception: - continue - - message = 'Failed to start audio. Check SDR device.' - if start_error: - message = f'Failed to start audio: {start_error}' - return jsonify({ - 'status': 'error', - 'message': message - }), 500 + _start_audio_stream(frequency, modulation) + + if audio_running: + audio_source = 'process' + return jsonify({ + 'status': 'started', + 'frequency': frequency, + 'modulation': modulation, + 'source': 'process', + }) + else: + # Avoid leaving a stale device claim after startup failure. + if receiver_active_device is not None: + app_module.release_sdr_device(receiver_active_device) + receiver_active_device = None + + start_error = '' + for log_path in ('/tmp/rtl_fm_stderr.log', '/tmp/ffmpeg_stderr.log'): + try: + with open(log_path, 'r') as handle: + content = handle.read().strip() + if content: + start_error = content.splitlines()[-1] + break + except Exception: + continue + + message = 'Failed to start audio. Check SDR device.' + if start_error: + message = f'Failed to start audio: {start_error}' + return jsonify({ + 'status': 'error', + 'message': message + }), 500 @receiver_bp.route('/audio/stop', methods=['POST']) @@ -1458,30 +1460,30 @@ def stop_audio() -> Response: return jsonify({'status': 'stopped'}) -@receiver_bp.route('/audio/status') -def audio_status() -> Response: - """Get audio status.""" - running = audio_running - if audio_source == 'waterfall': - try: - from routes.waterfall_websocket import get_shared_capture_status - - shared = get_shared_capture_status() - running = bool(shared.get('running') and shared.get('monitor_enabled')) - except Exception: - running = False - - return jsonify({ - 'running': running, - 'frequency': audio_frequency, - 'modulation': audio_modulation, - 'source': audio_source, - }) +@receiver_bp.route('/audio/status') +def audio_status() -> Response: + """Get audio status.""" + running = audio_running + if audio_source == 'waterfall': + try: + from routes.waterfall_websocket import get_shared_capture_status + + shared = get_shared_capture_status() + running = bool(shared.get('running') and shared.get('monitor_enabled')) + except Exception: + running = False + + return jsonify({ + 'running': running, + 'frequency': audio_frequency, + 'modulation': audio_modulation, + 'source': audio_source, + }) @receiver_bp.route('/audio/debug') -def audio_debug() -> Response: - """Get audio debug status and recent stderr logs.""" +def audio_debug() -> Response: + """Get audio debug status and recent stderr logs.""" rtl_log_path = '/tmp/rtl_fm_stderr.log' ffmpeg_log_path = '/tmp/ffmpeg_stderr.log' sample_path = '/tmp/audio_probe.bin' @@ -1493,53 +1495,53 @@ def audio_debug() -> Response: except Exception: return '' - shared = {} - if audio_source == 'waterfall': - try: - from routes.waterfall_websocket import get_shared_capture_status - - shared = get_shared_capture_status() - except Exception: - shared = {} - - return jsonify({ - 'running': audio_running, - 'frequency': audio_frequency, - 'modulation': audio_modulation, - 'source': audio_source, - 'sdr_type': scanner_config.get('sdr_type', 'rtlsdr'), - 'device': scanner_config.get('device', 0), - 'gain': scanner_config.get('gain', 0), - 'squelch': scanner_config.get('squelch', 0), - 'audio_process_alive': bool(audio_process and audio_process.poll() is None), - 'shared_capture': shared, - 'rtl_fm_stderr': _read_log(rtl_log_path), - 'ffmpeg_stderr': _read_log(ffmpeg_log_path), - 'audio_probe_bytes': os.path.getsize(sample_path) if os.path.exists(sample_path) else 0, - }) + shared = {} + if audio_source == 'waterfall': + try: + from routes.waterfall_websocket import get_shared_capture_status + + shared = get_shared_capture_status() + except Exception: + shared = {} + + return jsonify({ + 'running': audio_running, + 'frequency': audio_frequency, + 'modulation': audio_modulation, + 'source': audio_source, + 'sdr_type': scanner_config.get('sdr_type', 'rtlsdr'), + 'device': scanner_config.get('device', 0), + 'gain': scanner_config.get('gain', 0), + 'squelch': scanner_config.get('squelch', 0), + 'audio_process_alive': bool(audio_process and audio_process.poll() is None), + 'shared_capture': shared, + 'rtl_fm_stderr': _read_log(rtl_log_path), + 'ffmpeg_stderr': _read_log(ffmpeg_log_path), + 'audio_probe_bytes': os.path.getsize(sample_path) if os.path.exists(sample_path) else 0, + }) -@receiver_bp.route('/audio/probe') -def audio_probe() -> Response: - """Grab a small chunk of audio bytes from the pipeline for debugging.""" - global audio_process - - if audio_source == 'waterfall': - try: - from routes.waterfall_websocket import read_shared_monitor_audio_chunk - - data = read_shared_monitor_audio_chunk(timeout=2.0) - if not data: - return jsonify({'status': 'error', 'message': 'no shared audio data available'}), 504 - sample_path = '/tmp/audio_probe.bin' - with open(sample_path, 'wb') as handle: - handle.write(data) - return jsonify({'status': 'ok', 'bytes': len(data), 'source': 'waterfall'}) - except Exception as e: - return jsonify({'status': 'error', 'message': str(e)}), 500 - - if not audio_process or not audio_process.stdout: - return jsonify({'status': 'error', 'message': 'audio process not running'}), 400 +@receiver_bp.route('/audio/probe') +def audio_probe() -> Response: + """Grab a small chunk of audio bytes from the pipeline for debugging.""" + global audio_process + + if audio_source == 'waterfall': + try: + from routes.waterfall_websocket import read_shared_monitor_audio_chunk + + data = read_shared_monitor_audio_chunk(timeout=2.0) + if not data: + return jsonify({'status': 'error', 'message': 'no shared audio data available'}), 504 + sample_path = '/tmp/audio_probe.bin' + with open(sample_path, 'wb') as handle: + handle.write(data) + return jsonify({'status': 'ok', 'bytes': len(data), 'source': 'waterfall'}) + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + + if not audio_process or not audio_process.stdout: + return jsonify({'status': 'error', 'message': 'audio process not running'}), 400 sample_path = '/tmp/audio_probe.bin' size = 0 @@ -1559,61 +1561,61 @@ def audio_probe() -> Response: return jsonify({'status': 'ok', 'bytes': size}) -@receiver_bp.route('/audio/stream') -def stream_audio() -> Response: - """Stream WAV audio.""" - if audio_source == 'waterfall': - for _ in range(40): - if audio_running: - break - time.sleep(0.05) - - if not audio_running: - return Response(b'', mimetype='audio/wav', status=204) - - def generate_shared(): - global audio_running, audio_source - try: - from routes.waterfall_websocket import ( - get_shared_capture_status, - read_shared_monitor_audio_chunk, - ) - except Exception: - return - - # Browser expects an immediate WAV header. - yield _wav_header(sample_rate=48000) - - while audio_running and audio_source == 'waterfall': - chunk = read_shared_monitor_audio_chunk(timeout=1.0) - if chunk: - yield chunk - continue - shared = get_shared_capture_status() - if not shared.get('running') or not shared.get('monitor_enabled'): - audio_running = False - audio_source = 'process' - break - - return Response( - generate_shared(), - mimetype='audio/wav', - headers={ - 'Content-Type': 'audio/wav', - 'Cache-Control': 'no-cache, no-store', - 'X-Accel-Buffering': 'no', - 'Transfer-Encoding': 'chunked', - } - ) - - # Wait for audio process to be ready (up to 2 seconds). - for _ in range(40): - if audio_running and audio_process: - break - time.sleep(0.05) - - if not audio_running or not audio_process: - return Response(b'', mimetype='audio/wav', status=204) +@receiver_bp.route('/audio/stream') +def stream_audio() -> Response: + """Stream WAV audio.""" + if audio_source == 'waterfall': + for _ in range(40): + if audio_running: + break + time.sleep(0.05) + + if not audio_running: + return Response(b'', mimetype='audio/wav', status=204) + + def generate_shared(): + global audio_running, audio_source + try: + from routes.waterfall_websocket import ( + get_shared_capture_status, + read_shared_monitor_audio_chunk, + ) + except Exception: + return + + # Browser expects an immediate WAV header. + yield _wav_header(sample_rate=48000) + + while audio_running and audio_source == 'waterfall': + chunk = read_shared_monitor_audio_chunk(timeout=1.0) + if chunk: + yield chunk + continue + shared = get_shared_capture_status() + if not shared.get('running') or not shared.get('monitor_enabled'): + audio_running = False + audio_source = 'process' + break + + return Response( + generate_shared(), + mimetype='audio/wav', + headers={ + 'Content-Type': 'audio/wav', + 'Cache-Control': 'no-cache, no-store', + 'X-Accel-Buffering': 'no', + 'Transfer-Encoding': 'chunked', + } + ) + + # Wait for audio process to be ready (up to 2 seconds). + for _ in range(40): + if audio_running and audio_process: + break + time.sleep(0.05) + + if not audio_running or not audio_process: + return Response(b'', mimetype='audio/wav', status=204) def generate(): # Capture local reference to avoid race condition with stop @@ -1638,29 +1640,29 @@ def stream_audio() -> Response: if header_chunk: yield header_chunk - # Stream real-time audio - first_chunk_deadline = time.time() + 20.0 - warned_wait = False - while audio_running and proc.poll() is None: - # Use select to avoid blocking forever - ready, _, _ = select.select([proc.stdout], [], [], 2.0) - if ready: - chunk = proc.stdout.read(8192) - if chunk: - warned_wait = False - yield chunk - else: - break - else: - # Keep connection open while demodulator settles. - if time.time() > first_chunk_deadline: - if not warned_wait: - logger.warning("Audio stream still waiting for first chunk") - warned_wait = True - continue - # Timeout - check if process died - if proc.poll() is not None: - break + # Stream real-time audio + first_chunk_deadline = time.time() + 20.0 + warned_wait = False + while audio_running and proc.poll() is None: + # Use select to avoid blocking forever + ready, _, _ = select.select([proc.stdout], [], [], 2.0) + if ready: + chunk = proc.stdout.read(8192) + if chunk: + warned_wait = False + yield chunk + else: + break + else: + # Keep connection open while demodulator settles. + if time.time() > first_chunk_deadline: + if not warned_wait: + logger.warning("Audio stream still waiting for first chunk") + warned_wait = True + continue + # Timeout - check if process died + if proc.poll() is not None: + break except GeneratorExit: pass except Exception as e: @@ -1786,26 +1788,26 @@ def _parse_rtl_power_line(line: str) -> tuple[str | None, float | None, float | return timestamp, None, None, [] -def _waterfall_loop(): - """Continuous rtl_power sweep loop emitting waterfall data.""" - global waterfall_running, waterfall_process - - def _queue_waterfall_error(message: str) -> None: - try: - waterfall_queue.put_nowait({ - 'type': 'waterfall_error', - 'message': message, - 'timestamp': datetime.now().isoformat(), - }) - except queue.Full: - pass - - rtl_power_path = find_rtl_power() - if not rtl_power_path: - logger.error("rtl_power not found for waterfall") - _queue_waterfall_error('rtl_power not found') - waterfall_running = False - return +def _waterfall_loop(): + """Continuous rtl_power sweep loop emitting waterfall data.""" + global waterfall_running, waterfall_process + + def _queue_waterfall_error(message: str) -> None: + try: + waterfall_queue.put_nowait({ + 'type': 'waterfall_error', + 'message': message, + 'timestamp': datetime.now().isoformat(), + }) + except queue.Full: + pass + + rtl_power_path = find_rtl_power() + if not rtl_power_path: + logger.error("rtl_power not found for waterfall") + _queue_waterfall_error('rtl_power not found') + waterfall_running = False + return start_hz = int(waterfall_config['start_freq'] * 1e6) end_hz = int(waterfall_config['end_freq'] * 1e6) @@ -1823,49 +1825,49 @@ def _waterfall_loop(): ] try: - waterfall_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=1, - text=True, - ) - - # Detect immediate startup failures (e.g. device busy / no device). - time.sleep(0.35) - if waterfall_process.poll() is not None: - stderr_text = '' - try: - if waterfall_process.stderr: - stderr_text = waterfall_process.stderr.read().strip() - except Exception: - stderr_text = '' - msg = stderr_text or f'rtl_power exited early (code {waterfall_process.returncode})' - logger.error(f"Waterfall startup failed: {msg}") - _queue_waterfall_error(msg) - return - - current_ts = None - all_bins: list[float] = [] - sweep_start_hz = start_hz - sweep_end_hz = end_hz - received_any = False - - if not waterfall_process.stdout: - _queue_waterfall_error('rtl_power stdout unavailable') - return - - for line in waterfall_process.stdout: - if not waterfall_running: - break - - ts, seg_start, seg_end, bins = _parse_rtl_power_line(line) - if ts is None or not bins: - continue - received_any = True - - if current_ts is None: - current_ts = ts + waterfall_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + text=True, + ) + + # Detect immediate startup failures (e.g. device busy / no device). + time.sleep(0.35) + if waterfall_process.poll() is not None: + stderr_text = '' + try: + if waterfall_process.stderr: + stderr_text = waterfall_process.stderr.read().strip() + except Exception: + stderr_text = '' + msg = stderr_text or f'rtl_power exited early (code {waterfall_process.returncode})' + logger.error(f"Waterfall startup failed: {msg}") + _queue_waterfall_error(msg) + return + + current_ts = None + all_bins: list[float] = [] + sweep_start_hz = start_hz + sweep_end_hz = end_hz + received_any = False + + if not waterfall_process.stdout: + _queue_waterfall_error('rtl_power stdout unavailable') + return + + for line in waterfall_process.stdout: + if not waterfall_running: + break + + ts, seg_start, seg_end, bins = _parse_rtl_power_line(line) + if ts is None or not bins: + continue + received_any = True + + if current_ts is None: + current_ts = ts if ts != current_ts and all_bins: max_bins = int(waterfall_config.get('max_bins') or 0) @@ -1903,11 +1905,11 @@ def _waterfall_loop(): sweep_end_hz = max(sweep_end_hz, seg_end) # Flush any remaining bins - if all_bins and waterfall_running: - max_bins = int(waterfall_config.get('max_bins') or 0) - bins_to_send = all_bins - if max_bins > 0 and len(bins_to_send) > max_bins: - bins_to_send = _downsample_bins(bins_to_send, max_bins) + if all_bins and waterfall_running: + max_bins = int(waterfall_config.get('max_bins') or 0) + bins_to_send = all_bins + if max_bins > 0 and len(bins_to_send) > max_bins: + bins_to_send = _downsample_bins(bins_to_send, max_bins) msg = { 'type': 'waterfall_sweep', 'start_freq': sweep_start_hz / 1e6, @@ -1915,17 +1917,17 @@ def _waterfall_loop(): 'bins': bins_to_send, 'timestamp': datetime.now().isoformat(), } - try: - waterfall_queue.put_nowait(msg) - except queue.Full: - pass - - if waterfall_running and not received_any: - _queue_waterfall_error('No waterfall FFT data received from rtl_power') - - except Exception as e: - logger.error(f"Waterfall loop error: {e}") - _queue_waterfall_error(f"Waterfall loop error: {e}") + try: + waterfall_queue.put_nowait(msg) + except queue.Full: + pass + + if waterfall_running and not received_any: + _queue_waterfall_error('No waterfall FFT data received from rtl_power') + + except Exception as e: + logger.error(f"Waterfall loop error: {e}") + _queue_waterfall_error(f"Waterfall loop error: {e}") finally: waterfall_running = False if waterfall_process and waterfall_process.poll() is None: @@ -1967,14 +1969,14 @@ def start_waterfall() -> Response: """Start the waterfall/spectrogram display.""" global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device - with waterfall_lock: - if waterfall_running: - return jsonify({ - 'status': 'started', - 'already_running': True, - 'message': 'Waterfall already running', - 'config': waterfall_config, - }) + with waterfall_lock: + if waterfall_running: + return jsonify({ + 'status': 'started', + 'already_running': True, + 'message': 'Waterfall already running', + 'config': waterfall_config, + }) if not find_rtl_power(): return jsonify({'status': 'error', 'message': 'rtl_power not found'}), 503 @@ -2031,25 +2033,25 @@ def stop_waterfall() -> Response: return jsonify({'status': 'stopped'}) -@receiver_bp.route('/waterfall/stream') -def stream_waterfall() -> Response: - """SSE stream for waterfall data.""" - def _on_msg(msg: dict[str, Any]) -> None: - process_event('waterfall', msg, msg.get('type')) - - response = Response( - sse_stream_fanout( - source_queue=waterfall_queue, - channel_key='receiver_waterfall', - timeout=SSE_QUEUE_TIMEOUT, - keepalive_interval=SSE_KEEPALIVE_INTERVAL, - on_message=_on_msg, - ), - mimetype='text/event-stream', - ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - return response +@receiver_bp.route('/waterfall/stream') +def stream_waterfall() -> Response: + """SSE stream for waterfall data.""" + def _on_msg(msg: dict[str, Any]) -> None: + process_event('waterfall', msg, msg.get('type')) + + response = Response( + sse_stream_fanout( + source_queue=waterfall_queue, + channel_key='receiver_waterfall', + timeout=SSE_QUEUE_TIMEOUT, + keepalive_interval=SSE_KEEPALIVE_INTERVAL, + on_message=_on_msg, + ), + mimetype='text/event-stream', + ) + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + return response def _downsample_bins(values: list[float], target: int) -> list[float]: """Downsample bins to a target length using simple averaging.""" if target <= 0 or len(values) <= target: diff --git a/static/js/modes/waterfall.js b/static/js/modes/waterfall.js index 0074ebc..879e146 100644 --- a/static/js/modes/waterfall.js +++ b/static/js/modes/waterfall.js @@ -900,19 +900,27 @@ const Waterfall = (function () { resolve(ok); }; - const onReady = () => finish(true); + // Only treat actual playback as success. `loadeddata` and + // `canplay` fire when just the WAV header arrives — before any + // real audio samples have been decoded — which caused the + // monitor to report "started" while the stream was still silent. + const onReady = () => { + if (player.currentTime > 0 || (!player.paused && player.readyState >= 4)) { + finish(true); + } + }; const onFail = () => finish(false); - const events = ['playing', 'timeupdate', 'canplay', 'loadeddata']; + const events = ['playing', 'timeupdate']; const failEvents = ['error', 'abort', 'stalled', 'ended']; events.forEach((evt) => player.addEventListener(evt, onReady)); failEvents.forEach((evt) => player.addEventListener(evt, onFail)); timer = setTimeout(() => { - finish(!player.paused && (player.currentTime > 0 || player.readyState >= 2)); + finish(!player.paused && player.currentTime > 0); }, timeoutMs); - if (!player.paused && (player.currentTime > 0 || player.readyState >= 2)) { + if (!player.paused && player.currentTime > 0) { finish(true); } }); @@ -2571,6 +2579,7 @@ const Waterfall = (function () { } if (attempt < maxAttempts) { + _setMonitorState(`Waiting for audio stream (attempt ${attempt}/${maxAttempts})...`); await _wait(220 * attempt); continue; } @@ -2813,12 +2822,9 @@ const Waterfall = (function () { clearTimeout(_monitorRetuneTimer); _audioConnectNonce += 1; - try { - await fetch('/receiver/audio/stop', { method: 'POST' }); - } catch (_) { - // Ignore backend stop errors - } - + // Immediately pause audio and update the UI so the user gets instant + // feedback. The backend cleanup (which can block for 1-2 s while the + // SDR process group is reaped) happens afterwards. _stopSmeter(); _setUnlockVisible(false); _audioUnlockRequired = false; @@ -2836,6 +2842,13 @@ const Waterfall = (function () { _setVisualStatus('READY'); } + // Backend stop is fire-and-forget; UI is already updated above. + try { + await fetch('/receiver/audio/stop', { method: 'POST' }); + } catch (_) { + // Ignore backend stop errors + } + if (resumeWaterfall && _active) { _resumeWaterfallAfterMonitor = false; await start(); diff --git a/templates/index.html b/templates/index.html index c09744c..c41c28b 100644 --- a/templates/index.html +++ b/templates/index.html @@ -2936,7 +2936,7 @@ - +