diff --git a/routes/listening_post.py b/routes/listening_post.py index 092f262..3633153 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -96,16 +96,27 @@ def find_rx_fm() -> str | None: return shutil.which('rx_fm') -def find_ffmpeg() -> str | None: - """Find ffmpeg for audio encoding.""" - return shutil.which('ffmpeg') - - - - -def add_activity_log(event_type: str, frequency: float, details: str = ''): - """Add entry to activity log.""" - with activity_log_lock: +def find_ffmpeg() -> str | None: + """Find ffmpeg for audio encoding.""" + return shutil.which('ffmpeg') + + +VALID_MODULATIONS = ['fm', 'wfm', 'am', 'usb', 'lsb'] + + +def normalize_modulation(value: str) -> str: + """Normalize and validate modulation string.""" + mod = str(value or '').lower().strip() + if mod not in VALID_MODULATIONS: + raise ValueError(f'Invalid modulation. Use: {", ".join(VALID_MODULATIONS)}') + return mod + + + + +def add_activity_log(event_type: str, frequency: float, details: str = ''): + """Add entry to activity log.""" + with activity_log_lock: entry = { 'timestamp': datetime.utcnow().isoformat() + 'Z', 'type': event_type, @@ -723,56 +734,106 @@ def _start_audio_stream(frequency: float, modulation: str): 'pipe:1' ] - try: - # Use shell pipe for reliable streaming - # Log stderr to temp files for error diagnosis - rtl_stderr_log = '/tmp/rtl_fm_stderr.log' - ffmpeg_stderr_log = '/tmp/ffmpeg_stderr.log' - shell_cmd = f"{' '.join(sdr_cmd)} 2>{rtl_stderr_log} | {' '.join(encoder_cmd)} 2>{ffmpeg_stderr_log}" - logger.info(f"Starting audio: {frequency} MHz, mod={modulation}, device={scanner_config['device']}") - - # Retry loop for USB device contention (device may not be - # released immediately after a previous process exits) - max_attempts = 3 - for attempt in range(max_attempts): - audio_rtl_process = None # Not used in shell mode - audio_process = subprocess.Popen( - shell_cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=0, - start_new_session=True # Create new process group for clean shutdown - ) - - # Brief delay to check if process started successfully - time.sleep(0.3) - - if audio_process.poll() is not None: - # Read stderr from temp files - rtl_stderr = '' - ffmpeg_stderr = '' - try: - with open(rtl_stderr_log, 'r') as f: - rtl_stderr = f.read().strip() - except Exception: - pass - try: - with open(ffmpeg_stderr_log, 'r') as f: - ffmpeg_stderr = f.read().strip() - except Exception: - pass - - if 'usb_claim_interface' in rtl_stderr and attempt < max_attempts - 1: - logger.warning(f"USB device busy (attempt {attempt + 1}/{max_attempts}), waiting for release...") - time.sleep(1.0) - continue - - logger.error(f"Audio pipeline exited immediately. rtl_fm stderr: {rtl_stderr}, ffmpeg stderr: {ffmpeg_stderr}") - return - - # Pipeline started successfully - break + try: + # Use subprocess piping for reliable streaming. + # Log stderr to temp files for error diagnosis. + rtl_stderr_log = '/tmp/rtl_fm_stderr.log' + ffmpeg_stderr_log = '/tmp/ffmpeg_stderr.log' + logger.info(f"Starting audio: {frequency} MHz, mod={modulation}, device={scanner_config['device']}") + + # Retry loop for USB device contention (device may not be + # released immediately after a previous process exits) + max_attempts = 3 + for attempt in range(max_attempts): + audio_rtl_process = None + audio_process = None + rtl_err_handle = None + ffmpeg_err_handle = None + try: + rtl_err_handle = open(rtl_stderr_log, 'w') + ffmpeg_err_handle = open(ffmpeg_stderr_log, 'w') + audio_rtl_process = subprocess.Popen( + sdr_cmd, + stdout=subprocess.PIPE, + stderr=rtl_err_handle, + bufsize=0, + start_new_session=True # Create new process group for clean shutdown + ) + audio_process = subprocess.Popen( + encoder_cmd, + stdin=audio_rtl_process.stdout, + stdout=subprocess.PIPE, + stderr=ffmpeg_err_handle, + bufsize=0, + start_new_session=True # Create new process group for clean shutdown + ) + if audio_rtl_process.stdout: + audio_rtl_process.stdout.close() + finally: + if rtl_err_handle: + rtl_err_handle.close() + if ffmpeg_err_handle: + ffmpeg_err_handle.close() + + # Brief delay to check if process started successfully + time.sleep(0.3) + + if (audio_rtl_process and audio_rtl_process.poll() is not None) or ( + audio_process and audio_process.poll() is not None + ): + # Read stderr from temp files + rtl_stderr = '' + ffmpeg_stderr = '' + try: + with open(rtl_stderr_log, 'r') as f: + rtl_stderr = f.read().strip() + except Exception: + pass + try: + with open(ffmpeg_stderr_log, 'r') as f: + ffmpeg_stderr = f.read().strip() + except Exception: + pass + + if 'usb_claim_interface' in rtl_stderr and attempt < max_attempts - 1: + logger.warning(f"USB device busy (attempt {attempt + 1}/{max_attempts}), waiting for release...") + if audio_process: + try: + audio_process.terminate() + audio_process.wait(timeout=0.5) + except Exception: + pass + if audio_rtl_process: + try: + audio_rtl_process.terminate() + audio_rtl_process.wait(timeout=0.5) + except Exception: + pass + time.sleep(1.0) + continue + + if audio_process and audio_process.poll() is None: + try: + audio_process.terminate() + audio_process.wait(timeout=0.5) + except Exception: + pass + if audio_rtl_process and audio_rtl_process.poll() is None: + try: + audio_rtl_process.terminate() + audio_rtl_process.wait(timeout=0.5) + except Exception: + pass + audio_process = None + audio_rtl_process = None + + logger.error( + f"Audio pipeline exited immediately. rtl_fm stderr: {rtl_stderr}, ffmpeg stderr: {ffmpeg_stderr}" + ) + return + + # Pipeline started successfully + break # Validate that audio is producing data quickly try: @@ -797,28 +858,38 @@ def _stop_audio_stream(): _stop_audio_stream_internal() -def _stop_audio_stream_internal(): - """Internal stop (must hold lock).""" - global audio_process, audio_rtl_process, audio_running, audio_frequency - - # Set flag first to stop any streaming - audio_running = False - audio_frequency = 0.0 - - # Kill the shell process and its children - if audio_process: - try: - # Kill entire process group (rtl_fm, ffmpeg, shell) - try: - os.killpg(os.getpgid(audio_process.pid), signal.SIGKILL) - except (ProcessLookupError, PermissionError): - audio_process.kill() - audio_process.wait(timeout=0.5) - except: - pass - - audio_process = None - audio_rtl_process = None +def _stop_audio_stream_internal(): + """Internal stop (must hold lock).""" + global audio_process, audio_rtl_process, audio_running, audio_frequency + + # Set flag first to stop any streaming + audio_running = False + audio_frequency = 0.0 + + # Kill the pipeline processes and their groups + if audio_process: + try: + # Kill entire process group (SDR demod + ffmpeg) + try: + os.killpg(os.getpgid(audio_process.pid), signal.SIGKILL) + except (ProcessLookupError, PermissionError): + audio_process.kill() + audio_process.wait(timeout=0.5) + except Exception: + pass + + if audio_rtl_process: + try: + try: + os.killpg(os.getpgid(audio_rtl_process.pid), signal.SIGKILL) + except (ProcessLookupError, PermissionError): + audio_rtl_process.kill() + audio_rtl_process.wait(timeout=0.5) + except Exception: + pass + + audio_process = None + audio_rtl_process = None # Kill any orphaned rtl_fm, rtl_power, and ffmpeg processes for proc_pattern in ['rtl_fm', 'rtl_power']: @@ -891,7 +962,7 @@ def start_scanner() -> Response: scanner_config['start_freq'] = float(data.get('start_freq', 88.0)) scanner_config['end_freq'] = float(data.get('end_freq', 108.0)) scanner_config['step'] = float(data.get('step', 0.1)) - scanner_config['modulation'] = str(data.get('modulation', 'wfm')).lower() + scanner_config['modulation'] = normalize_modulation(data.get('modulation', 'wfm')) scanner_config['squelch'] = int(data.get('squelch', 0)) scanner_config['dwell_time'] = float(data.get('dwell_time', 3.0)) scanner_config['scan_delay'] = float(data.get('scan_delay', 0.5)) @@ -1073,9 +1144,15 @@ def update_scanner_config() -> Response: scanner_config['dwell_time'] = int(data['dwell_time']) updated.append(f"dwell={data['dwell_time']}s") - if 'modulation' in data: - scanner_config['modulation'] = str(data['modulation']).lower() - updated.append(f"mod={data['modulation']}") + if 'modulation' in data: + try: + scanner_config['modulation'] = normalize_modulation(data['modulation']) + updated.append(f"mod={data['modulation']}") + except (ValueError, TypeError) as e: + return jsonify({ + 'status': 'error', + 'message': str(e) + }), 400 if updated: logger.info(f"Scanner config updated: {', '.join(updated)}") @@ -1197,7 +1274,7 @@ def start_audio() -> Response: try: frequency = float(data.get('frequency', 0)) - modulation = str(data.get('modulation', 'wfm')).lower() + modulation = normalize_modulation(data.get('modulation', 'wfm')) squelch = int(data.get('squelch', 0)) gain = int(data.get('gain', 40)) device = int(data.get('device', 0)) @@ -1214,13 +1291,6 @@ def start_audio() -> Response: 'message': 'frequency is required' }), 400 - valid_mods = ['fm', 'wfm', 'am', 'usb', 'lsb'] - if modulation not in valid_mods: - return jsonify({ - 'status': 'error', - 'message': f'Invalid modulation. Use: {", ".join(valid_mods)}' - }), 400 - valid_sdr_types = ['rtlsdr', 'hackrf', 'airspy', 'limesdr', 'sdrplay'] if sdr_type not in valid_sdr_types: return jsonify({