Release v1.3.0: Multi-source downloads, audio analyzer resilience, mobile improvements
Major Features: - Multi-source download system (Soulseek/Lidarr with fallback) - Configurable enrichment speed control (1-5x) - Mobile touch drag support for seek sliders - iOS PWA media controls (Control Center, Lock Screen) - Artist name alias resolution via Last.fm - Circuit breaker pattern for audio analysis Critical Fixes: - Audio analyzer stability (non-ASCII, BrokenProcessPool, OOM) - Discovery system race conditions and import failures - Radio decade categorization using originalYear - LastFM API response normalization - Mood bucket infinite loop prevention Security: - Bull Board admin authentication - Lidarr webhook signature verification - JWT token expiration and refresh - Encryption key validation on startup Closes #2, #6, #9, #13, #21, #26, #31, #34, #35, #37, #40, #43
This commit is contained in:
@@ -6,6 +6,17 @@ FROM ubuntu:20.04
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
ENV TZ=UTC
|
||||
|
||||
# CPU thread limiting for TensorFlow and numerical libraries
|
||||
# Prevents each worker from using all CPU cores
|
||||
# Override with docker-compose environment variables
|
||||
ENV TF_NUM_INTRAOP_THREADS=1 \
|
||||
TF_NUM_INTEROP_THREADS=1 \
|
||||
OMP_NUM_THREADS=1 \
|
||||
OPENBLAS_NUM_THREADS=1 \
|
||||
MKL_NUM_THREADS=1 \
|
||||
NUMEXPR_MAX_THREADS=1 \
|
||||
THREADS_PER_WORKER=1
|
||||
|
||||
# Install system dependencies
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
python3 \
|
||||
|
||||
@@ -1,4 +1,39 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Audio analyzer service - Essentia-based analysis with TensorFlow ML models"""
|
||||
|
||||
# ============================================================================
|
||||
# CRITICAL: TensorFlow threading MUST be configured before any imports
|
||||
# Environment variables are read by TensorFlow C++ runtime before initialization
|
||||
# ============================================================================
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Get thread configuration from environment (default to 1 for safety)
|
||||
THREADS_PER_WORKER = int(os.getenv('THREADS_PER_WORKER', '1'))
|
||||
|
||||
# Configure TensorFlow threading via environment variables
|
||||
# These are read by TensorFlow C++ runtime before thread pool initialization
|
||||
# Must be set BEFORE any TensorFlow/Essentia imports load TensorFlow
|
||||
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Reduce TF logging noise
|
||||
os.environ['TF_NUM_INTRAOP_THREADS'] = str(THREADS_PER_WORKER) # Threads within ops
|
||||
os.environ['TF_NUM_INTEROP_THREADS'] = '1' # Serialize op scheduling
|
||||
|
||||
# Also set NumPy/BLAS/OpenMP limits for non-TensorFlow operations
|
||||
os.environ['OMP_NUM_THREADS'] = str(THREADS_PER_WORKER)
|
||||
os.environ['OPENBLAS_NUM_THREADS'] = str(THREADS_PER_WORKER)
|
||||
os.environ['MKL_NUM_THREADS'] = str(THREADS_PER_WORKER)
|
||||
os.environ['NUMEXPR_MAX_THREADS'] = str(THREADS_PER_WORKER)
|
||||
|
||||
# Log thread configuration on startup
|
||||
print("=" * 80, file=sys.stderr)
|
||||
print("AUDIO ANALYZER THREAD CONFIGURATION", file=sys.stderr)
|
||||
print("=" * 80, file=sys.stderr)
|
||||
print(f"TF_NUM_INTRAOP_THREADS: {THREADS_PER_WORKER}", file=sys.stderr)
|
||||
print(f"TF_NUM_INTEROP_THREADS: 1", file=sys.stderr)
|
||||
print(f"OpenMP/BLAS threads: {THREADS_PER_WORKER}", file=sys.stderr)
|
||||
print(f"Expected CPU usage: ~{THREADS_PER_WORKER * 100 + 100}% per worker", file=sys.stderr)
|
||||
print("=" * 80, file=sys.stderr)
|
||||
|
||||
"""
|
||||
Essentia Audio Analyzer Service - Enhanced Vibe Matching
|
||||
|
||||
@@ -18,8 +53,9 @@ Two analysis modes:
|
||||
It connects to Redis for job queue and PostgreSQL for storing results.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
# NOW safe to import other dependencies
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
@@ -30,6 +66,19 @@ import numpy as np
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
import multiprocessing
|
||||
|
||||
# BrokenProcessPool was added in Python 3.9, provide compatibility for Python 3.8
|
||||
try:
|
||||
from concurrent.futures import BrokenProcessPool
|
||||
except ImportError:
|
||||
# Python 3.8 fallback: use the internal class or create a compatible exception
|
||||
try:
|
||||
from concurrent.futures.process import BrokenProcessPool
|
||||
except ImportError:
|
||||
# If still not available, create a compatible exception class
|
||||
class BrokenProcessPool(Exception):
|
||||
"""Compatibility shim for Python < 3.9"""
|
||||
pass
|
||||
|
||||
# Force spawn mode for TensorFlow compatibility (must be called before any multiprocessing)
|
||||
try:
|
||||
multiprocessing.set_start_method('spawn', force=True)
|
||||
@@ -38,7 +87,7 @@ except RuntimeError:
|
||||
|
||||
import redis
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
from psycopg2.extras import RealDictCursor, Json
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
@@ -85,7 +134,47 @@ def _get_auto_workers() -> int:
|
||||
auto_workers = max(2, min(8, cpu_count // 2))
|
||||
return auto_workers
|
||||
|
||||
NUM_WORKERS = int(os.getenv('NUM_WORKERS', str(_get_auto_workers())))
|
||||
|
||||
def _get_workers_from_db() -> int:
|
||||
"""
|
||||
Fetch worker count from SystemSettings table.
|
||||
Falls back to env var or default if database query fails.
|
||||
"""
|
||||
try:
|
||||
db = DatabaseConnection(DATABASE_URL)
|
||||
db.connect()
|
||||
cursor = db.get_cursor()
|
||||
|
||||
cursor.execute("""
|
||||
SELECT "audioAnalyzerWorkers"
|
||||
FROM "SystemSettings"
|
||||
WHERE id = 'default'
|
||||
LIMIT 1
|
||||
""")
|
||||
|
||||
result = cursor.fetchone()
|
||||
cursor.close()
|
||||
db.close()
|
||||
|
||||
if result and result['audioAnalyzerWorkers'] is not None:
|
||||
workers = int(result['audioAnalyzerWorkers'])
|
||||
# Validate range (1-8)
|
||||
workers = max(1, min(8, workers))
|
||||
logger.info(f"Loaded worker count from database: {workers}")
|
||||
return workers
|
||||
else:
|
||||
logger.info("No worker count found in database, using env var or default")
|
||||
return int(os.getenv('NUM_WORKERS', str(DEFAULT_WORKERS)))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch worker count from database: {e}")
|
||||
logger.info("Falling back to env var or default")
|
||||
return int(os.getenv('NUM_WORKERS', str(DEFAULT_WORKERS)))
|
||||
# Conservative default: 2 workers (stable on any system)
|
||||
# Previous default used auto-scaling which could cause OOM on memory-constrained systems
|
||||
DEFAULT_WORKERS = 2
|
||||
# Try to load from database first, fall back to env var or default
|
||||
NUM_WORKERS = _get_workers_from_db()
|
||||
ESSENTIA_VERSION = '2.1b6-enhanced-v2'
|
||||
|
||||
# Retry configuration
|
||||
@@ -96,6 +185,9 @@ STALE_PROCESSING_MINUTES = int(os.getenv('STALE_PROCESSING_MINUTES', '10')) # R
|
||||
ANALYSIS_QUEUE = 'audio:analysis:queue'
|
||||
ANALYSIS_PROCESSING = 'audio:analysis:processing'
|
||||
|
||||
# Control channel for enrichment coordination
|
||||
CONTROL_CHANNEL = 'audio:analysis:control'
|
||||
|
||||
# Model paths (pre-packaged in Docker image)
|
||||
MODEL_DIR = '/app/models'
|
||||
|
||||
@@ -126,12 +218,18 @@ class DatabaseConnection:
|
||||
self.conn = None
|
||||
|
||||
def connect(self):
|
||||
"""Establish database connection"""
|
||||
"""Establish database connection with explicit UTF-8 encoding"""
|
||||
if not self.url:
|
||||
raise ValueError("DATABASE_URL not set")
|
||||
self.conn = psycopg2.connect(self.url)
|
||||
|
||||
# Ensure UTF-8 encoding for international file paths (Issue #6 fix)
|
||||
self.conn = psycopg2.connect(
|
||||
self.url,
|
||||
options="-c client_encoding=UTF8"
|
||||
)
|
||||
self.conn.set_client_encoding('UTF8')
|
||||
self.conn.autocommit = False
|
||||
logger.info("Connected to PostgreSQL")
|
||||
logger.info("Connected to PostgreSQL with UTF-8 encoding")
|
||||
|
||||
def get_cursor(self):
|
||||
"""Get a database cursor"""
|
||||
@@ -201,6 +299,8 @@ class AudioAnalyzer:
|
||||
Architecture:
|
||||
1. Base MusiCNN model generates embeddings from audio
|
||||
2. Classification head models take embeddings and output predictions
|
||||
|
||||
If models are missing, gracefully fall back to Standard mode.
|
||||
"""
|
||||
if not TF_MODELS_AVAILABLE:
|
||||
logger.info("TensorFlow not available - using Standard mode")
|
||||
@@ -212,13 +312,24 @@ class AudioAnalyzer:
|
||||
|
||||
# First, load the base MusiCNN embedding model
|
||||
if os.path.exists(MODELS['musicnn']):
|
||||
self.musicnn_model = TensorflowPredictMusiCNN(
|
||||
graphFilename=MODELS['musicnn'],
|
||||
output="model/dense/BiasAdd" # Embedding layer output
|
||||
)
|
||||
logger.info("Loaded base MusiCNN model for embeddings")
|
||||
try:
|
||||
self.musicnn_model = TensorflowPredictMusiCNN(
|
||||
graphFilename=MODELS['musicnn'],
|
||||
output="model/dense/BiasAdd" # Embedding layer output
|
||||
)
|
||||
logger.info("Loaded base MusiCNN model for embeddings")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load MusiCNN model: {e}")
|
||||
logger.info("Falling back to Standard mode (heuristic-based analysis)")
|
||||
self.enhanced_mode = False
|
||||
return
|
||||
else:
|
||||
logger.error(f"Base MusiCNN model not found: {MODELS['musicnn']}")
|
||||
logger.warning(f"Base MusiCNN model not found at: {MODELS['musicnn']}")
|
||||
logger.info("This is normal if models haven't been downloaded yet.")
|
||||
logger.info("Falling back to Standard mode (heuristic-based analysis)")
|
||||
logger.info("Standard mode still provides BPM, key, energy, and mood detection,")
|
||||
logger.info("but uses audio features instead of ML predictions.")
|
||||
self.enhanced_mode = False
|
||||
return
|
||||
|
||||
# Load classification head models
|
||||
@@ -278,6 +389,70 @@ class AudioAnalyzer:
|
||||
logger.error(f"Failed to load audio {file_path}: {e}")
|
||||
return None
|
||||
|
||||
def validate_audio(self, audio, file_path: str) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Validate audio before analysis to detect edge cases that cause crashes.
|
||||
|
||||
Returns:
|
||||
(is_valid, error_message) - error_message is None if valid
|
||||
|
||||
Checks:
|
||||
1. Duration >= 5 seconds (very short files cause rhythm extraction issues)
|
||||
2. Not mostly silence (>80% silence = likely corrupted or blank file)
|
||||
3. Basic signal statistics (detect NaN/Inf corruption)
|
||||
"""
|
||||
try:
|
||||
# Check 1: Minimum duration
|
||||
sample_rate = 44100 # Assumed sample rate for validation
|
||||
duration = len(audio) / sample_rate
|
||||
|
||||
if duration < 5.0:
|
||||
return (False, f"Audio too short: {duration:.1f}s (minimum 5s)")
|
||||
|
||||
# Check 2: Signal statistics (detect corruption)
|
||||
if len(audio) == 0:
|
||||
return (False, "Audio is empty")
|
||||
|
||||
# Check for NaN or Inf values
|
||||
if np.any(np.isnan(audio)) or np.any(np.isinf(audio)):
|
||||
return (False, "Audio contains NaN or Inf values (corrupted)")
|
||||
|
||||
# Check 3: Silence detection
|
||||
# Calculate RMS energy across the entire audio
|
||||
try:
|
||||
rms = es.RMS()
|
||||
frame_size = 2048
|
||||
hop_size = 1024
|
||||
silent_frames = 0
|
||||
total_frames = 0
|
||||
|
||||
# Silence threshold: RMS < 0.001 (very quiet)
|
||||
silence_threshold = 0.001
|
||||
|
||||
for i in range(0, len(audio) - frame_size, hop_size):
|
||||
frame = audio[i:i + frame_size]
|
||||
frame_rms = rms(frame)
|
||||
total_frames += 1
|
||||
if frame_rms < silence_threshold:
|
||||
silent_frames += 1
|
||||
|
||||
if total_frames > 0:
|
||||
silence_ratio = silent_frames / total_frames
|
||||
if silence_ratio > 0.8:
|
||||
return (False, f"Audio is {silence_ratio*100:.0f}% silence (likely corrupted or blank)")
|
||||
|
||||
except Exception as silence_error:
|
||||
# Silence check failed - log but don't fail validation
|
||||
logger.warning(f"Silence detection failed for {file_path}: {silence_error}")
|
||||
|
||||
# All checks passed
|
||||
return (True, None)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Audio validation error for {file_path}: {e}")
|
||||
# On validation error, allow analysis to proceed (fail-open)
|
||||
return (True, None)
|
||||
|
||||
def analyze(self, file_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Analyze audio file and extract all features.
|
||||
@@ -344,21 +519,63 @@ class AudioAnalyzer:
|
||||
audio_16k = self.load_audio(file_path, 16000)
|
||||
|
||||
if audio_44k is None or audio_16k is None:
|
||||
result['_error'] = 'Failed to load audio file'
|
||||
return result
|
||||
|
||||
# Validate audio before analysis (Phase 2 defensive improvement)
|
||||
is_valid, validation_error = self.validate_audio(audio_44k, file_path)
|
||||
if not is_valid:
|
||||
logger.warning(f"Audio validation failed for {file_path}: {validation_error}")
|
||||
result['_error'] = validation_error
|
||||
return result
|
||||
|
||||
try:
|
||||
# === BASIC FEATURES (always extracted) ===
|
||||
|
||||
# Rhythm Analysis
|
||||
bpm, beats, beats_confidence, _, beats_intervals = self.rhythm_extractor(audio_44k)
|
||||
result['bpm'] = round(float(bpm), 1)
|
||||
result['beatsCount'] = len(beats)
|
||||
# Rhythm Analysis with defensive error handling (Issue #13 fix)
|
||||
try:
|
||||
bpm, beats, beats_confidence, _, beats_intervals = self.rhythm_extractor(audio_44k)
|
||||
result['bpm'] = round(float(bpm), 1)
|
||||
result['beatsCount'] = len(beats)
|
||||
except Exception as rhythm_error:
|
||||
# RhythmExtractor2013 can crash on edge cases (silence, corruption, very short files)
|
||||
logger.warning(f"RhythmExtractor2013 failed, using fallback BPM estimation: {rhythm_error}")
|
||||
|
||||
# Fallback: Simple onset-based BPM estimation
|
||||
try:
|
||||
# Use OnsetRate to estimate tempo from percussive onsets
|
||||
onset_detector = es.OnsetRate()
|
||||
onset_rate, _ = onset_detector(audio_44k)
|
||||
# OnsetRate returns onsets/second, convert to BPM estimate
|
||||
# Typical: 1-3 onsets/sec = 60-180 BPM
|
||||
bpm = max(60, min(180, onset_rate * 60))
|
||||
result['bpm'] = round(float(bpm), 1)
|
||||
result['beatsCount'] = 0 # Can't reliably count beats without RhythmExtractor
|
||||
logger.info(f"Fallback BPM estimate: {result['bpm']} (from onset rate: {onset_rate:.2f}/sec)")
|
||||
except Exception as fallback_error:
|
||||
# Even fallback failed - use neutral default
|
||||
logger.warning(f"Onset-based fallback also failed: {fallback_error}")
|
||||
bpm = 120.0 # Neutral default tempo
|
||||
result['bpm'] = 120.0
|
||||
result['beatsCount'] = 0
|
||||
logger.info("Using default BPM: 120.0")
|
||||
|
||||
# Key Detection
|
||||
key, scale, strength = self.key_extractor(audio_44k)
|
||||
result['key'] = key
|
||||
result['keyScale'] = scale
|
||||
result['keyStrength'] = round(float(strength), 3)
|
||||
# Key Detection with defensive error handling
|
||||
try:
|
||||
key, scale, strength = self.key_extractor(audio_44k)
|
||||
result['key'] = key
|
||||
result['keyScale'] = scale
|
||||
result['keyStrength'] = round(float(strength), 3)
|
||||
except Exception as key_error:
|
||||
# Key extraction can fail on edge cases, use defaults
|
||||
logger.warning(f"Key extraction failed: {key_error}")
|
||||
key = 'C'
|
||||
scale = 'major'
|
||||
strength = 0.0
|
||||
result['key'] = key
|
||||
result['keyScale'] = scale
|
||||
result['keyStrength'] = 0.0
|
||||
logger.info("Using default key: C major")
|
||||
|
||||
# Energy & Dynamics - using RMS for proper 0-1 energy
|
||||
rms_values = []
|
||||
@@ -779,10 +996,23 @@ class AudioAnalyzer:
|
||||
_process_analyzer = None
|
||||
|
||||
def _init_worker_process():
|
||||
"""Initialize the analyzer for a worker process"""
|
||||
"""
|
||||
Initialize the analyzer for a worker process.
|
||||
|
||||
If model loading fails, the analyzer will fall back to Standard mode.
|
||||
This prevents worker crashes from breaking the entire process pool.
|
||||
"""
|
||||
global _process_analyzer
|
||||
_process_analyzer = AudioAnalyzer()
|
||||
logger.info(f"Worker process {os.getpid()} initialized with analyzer")
|
||||
try:
|
||||
_process_analyzer = AudioAnalyzer()
|
||||
mode = "Enhanced" if _process_analyzer.enhanced_mode else "Standard"
|
||||
logger.info(f"Worker process {os.getpid()} initialized with analyzer ({mode} mode)")
|
||||
except Exception as e:
|
||||
logger.error(f"Worker initialization error: {e}")
|
||||
logger.error("This worker will not be able to process tracks.")
|
||||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||||
# Re-raise to kill this worker - better than silent failures
|
||||
raise
|
||||
|
||||
def _analyze_track_in_process(args: Tuple[str, str]) -> Tuple[str, str, Dict[str, Any]]:
|
||||
"""
|
||||
@@ -793,10 +1023,20 @@ def _analyze_track_in_process(args: Tuple[str, str]) -> Tuple[str, str, Dict[str
|
||||
track_id, file_path = args
|
||||
|
||||
try:
|
||||
# Ensure path is properly decoded (Issue #6 fix)
|
||||
if isinstance(file_path, bytes):
|
||||
file_path = file_path.decode('utf-8', errors='replace')
|
||||
|
||||
# Normalize path separators (Windows paths -> Unix)
|
||||
normalized_path = file_path.replace('\\', '/')
|
||||
full_path = os.path.join(MUSIC_PATH, normalized_path)
|
||||
|
||||
# Use os.fsencode/fsdecode for filesystem-safe encoding
|
||||
try:
|
||||
full_path = os.fsdecode(os.fsencode(full_path))
|
||||
except (UnicodeError, AttributeError):
|
||||
return (track_id, file_path, {'_error': 'Invalid characters in file path'})
|
||||
|
||||
if not os.path.exists(full_path):
|
||||
return (track_id, file_path, {'_error': 'File not found'})
|
||||
|
||||
@@ -804,6 +1044,9 @@ def _analyze_track_in_process(args: Tuple[str, str]) -> Tuple[str, str, Dict[str
|
||||
features = _process_analyzer.analyze(full_path)
|
||||
return (track_id, file_path, features)
|
||||
|
||||
except UnicodeDecodeError as e:
|
||||
logger.error(f"UTF-8 decoding error for track {track_id}: {e}")
|
||||
return (track_id, file_path, {'_error': f'UTF-8 encoding error: {e}'})
|
||||
except Exception as e:
|
||||
logger.error(f"Analysis error for {file_path}: {e}")
|
||||
return (track_id, file_path, {'_error': str(e)})
|
||||
@@ -818,19 +1061,159 @@ class AnalysisWorker:
|
||||
self.running = False
|
||||
self.executor = None
|
||||
self.consecutive_empty = 0
|
||||
self._tracks_since_refresh = 0 # Track count for periodic pool refresh
|
||||
self.is_paused = False # Enrichment control: pause state
|
||||
self.pubsub = None # Redis pub/sub for control signals
|
||||
self._setup_control_channel()
|
||||
|
||||
def _setup_control_channel(self):
|
||||
"""Subscribe to control channel for pause/resume/stop signals"""
|
||||
try:
|
||||
self.pubsub = self.redis.pubsub()
|
||||
self.pubsub.subscribe(CONTROL_CHANNEL)
|
||||
logger.info(f"Subscribed to control channel: {CONTROL_CHANNEL}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to subscribe to control channel: {e}")
|
||||
self.pubsub = None
|
||||
|
||||
def _check_control_signals(self):
|
||||
"""Check for pause/resume/stop/set_workers control signals (non-blocking)"""
|
||||
if not self.pubsub:
|
||||
return
|
||||
|
||||
try:
|
||||
message = self.pubsub.get_message(ignore_subscribe_messages=True, timeout=0.001)
|
||||
if message and message['type'] == 'message':
|
||||
data = message['data'].decode('utf-8') if isinstance(message['data'], bytes) else message['data']
|
||||
|
||||
# Try to parse as JSON for structured commands
|
||||
try:
|
||||
cmd = json.loads(data)
|
||||
if isinstance(cmd, dict) and cmd.get('command') == 'set_workers':
|
||||
new_count = int(cmd.get('count', NUM_WORKERS))
|
||||
new_count = max(1, min(8, new_count))
|
||||
self._resize_worker_pool(new_count)
|
||||
return
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass # Not JSON, try as plain string
|
||||
|
||||
# Handle plain string signals (pause/resume/stop)
|
||||
logger.info(f"Received control signal: {data}")
|
||||
|
||||
if data == 'pause':
|
||||
self.is_paused = True
|
||||
logger.info("Audio analysis PAUSED")
|
||||
elif data == 'resume':
|
||||
self.is_paused = False
|
||||
logger.info("Audio analysis RESUMED")
|
||||
elif data == 'stop':
|
||||
self.running = False
|
||||
logger.info("Audio analysis STOPPING (graceful shutdown)")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error checking control signals: {e}")
|
||||
|
||||
def _resize_worker_pool(self, new_count: int):
|
||||
"""
|
||||
Resize the worker pool to a new count.
|
||||
Gracefully completes in-flight work before resizing.
|
||||
"""
|
||||
global NUM_WORKERS
|
||||
|
||||
if new_count == NUM_WORKERS:
|
||||
logger.info(f"Worker count unchanged at {new_count}")
|
||||
return
|
||||
|
||||
logger.info(f"Resizing worker pool: {NUM_WORKERS} -> {new_count} workers")
|
||||
|
||||
old_executor = self.executor
|
||||
NUM_WORKERS = new_count
|
||||
|
||||
# Create new pool first
|
||||
self.executor = ProcessPoolExecutor(
|
||||
max_workers=NUM_WORKERS,
|
||||
initializer=_init_worker_process
|
||||
)
|
||||
|
||||
# Gracefully shutdown old pool (wait for in-flight work)
|
||||
if old_executor:
|
||||
try:
|
||||
old_executor.shutdown(wait=True)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error shutting down old pool: {e}")
|
||||
|
||||
self._tracks_since_refresh = 0
|
||||
logger.info(f"Worker pool resized to {NUM_WORKERS} workers")
|
||||
|
||||
def _check_pool_health(self) -> bool:
|
||||
"""
|
||||
Check if the process pool is still healthy.
|
||||
Returns False if pool is broken or workers are dead.
|
||||
"""
|
||||
if self.executor is None:
|
||||
return False
|
||||
|
||||
# Check if pool is explicitly marked as broken
|
||||
if hasattr(self.executor, '_broken') and self.executor._broken:
|
||||
return False
|
||||
|
||||
# Try a no-op submission to verify pool works
|
||||
try:
|
||||
future = self.executor.submit(lambda: True)
|
||||
result = future.result(timeout=5)
|
||||
return result is True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _recreate_pool(self):
|
||||
"""
|
||||
Safely terminate the broken pool and create a new one.
|
||||
This is the critical recovery mechanism for Issue #21.
|
||||
"""
|
||||
logger.warning("Recreating process pool due to broken workers...")
|
||||
|
||||
# Attempt graceful shutdown first
|
||||
if self.executor:
|
||||
try:
|
||||
# Python 3.8 compatibility: cancel_futures parameter added in 3.9
|
||||
self.executor.shutdown(wait=False)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error during executor shutdown: {e}")
|
||||
|
||||
# Small delay to allow cleanup
|
||||
time.sleep(2)
|
||||
|
||||
# Create fresh pool
|
||||
self.executor = ProcessPoolExecutor(
|
||||
max_workers=NUM_WORKERS,
|
||||
initializer=_init_worker_process
|
||||
)
|
||||
|
||||
# Reset track counter
|
||||
self._tracks_since_refresh = 0
|
||||
|
||||
logger.info(f"Process pool recreated with {NUM_WORKERS} workers")
|
||||
|
||||
def _cleanup_stale_processing(self):
|
||||
"""Reset tracks stuck in 'processing' status (from crashed workers)"""
|
||||
cursor = self.db.get_cursor()
|
||||
try:
|
||||
# Reset tracks that have been "processing" for too long
|
||||
# Prefer analysisStartedAt if available, fallback to updatedAt
|
||||
cursor.execute("""
|
||||
UPDATE "Track"
|
||||
SET "analysisStatus" = 'pending'
|
||||
SET
|
||||
"analysisStatus" = 'pending',
|
||||
"analysisStartedAt" = NULL,
|
||||
"analysisRetryCount" = COALESCE("analysisRetryCount", 0) + 1
|
||||
WHERE "analysisStatus" = 'processing'
|
||||
AND "updatedAt" < NOW() - INTERVAL '%s minutes'
|
||||
AND (
|
||||
("analysisStartedAt" IS NOT NULL AND "analysisStartedAt" < NOW() - INTERVAL '%s minutes')
|
||||
OR
|
||||
("analysisStartedAt" IS NULL AND "updatedAt" < NOW() - INTERVAL '%s minutes')
|
||||
)
|
||||
AND COALESCE("analysisRetryCount", 0) < %s
|
||||
RETURNING id
|
||||
""", (STALE_PROCESSING_MINUTES,))
|
||||
""", (STALE_PROCESSING_MINUTES, STALE_PROCESSING_MINUTES, MAX_RETRIES))
|
||||
|
||||
reset_ids = cursor.fetchall()
|
||||
reset_count = len(reset_ids)
|
||||
@@ -896,7 +1279,7 @@ class AnalysisWorker:
|
||||
logger.info(f" Batch size: {BATCH_SIZE}")
|
||||
logger.info(f" CPU cores detected: {cpu_count}")
|
||||
logger.info(f" Auto-scaled workers: {auto_workers} (50% of cores, min 2, max 8)")
|
||||
logger.info(f" Active workers: {NUM_WORKERS}" + (" (from env)" if os.getenv('NUM_WORKERS') else " (auto)"))
|
||||
logger.info(f" Active workers: {NUM_WORKERS}" + (" (from env)" if os.getenv('NUM_WORKERS') else " (default: 2)"))
|
||||
logger.info(f" Max retries per track: {MAX_RETRIES}")
|
||||
logger.info(f" Stale processing timeout: {STALE_PROCESSING_MINUTES} minutes")
|
||||
logger.info(f" Essentia available: {ESSENTIA_AVAILABLE}")
|
||||
@@ -923,6 +1306,17 @@ class AnalysisWorker:
|
||||
try:
|
||||
while self.running:
|
||||
try:
|
||||
# Check for control signals (pause/resume/stop)
|
||||
self._check_control_signals()
|
||||
|
||||
# If paused, sleep and continue checking for resume
|
||||
if self.is_paused:
|
||||
logger.debug("Audio analysis paused, waiting for resume signal...")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# Process work - health check removed as it was too aggressive
|
||||
# BrokenProcessPool exception handling below will catch real issues
|
||||
has_work = self.process_batch_parallel()
|
||||
|
||||
if not has_work:
|
||||
@@ -940,6 +1334,12 @@ class AnalysisWorker:
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Shutdown requested")
|
||||
self.running = False
|
||||
except BrokenProcessPool:
|
||||
# Explicit handling for BrokenProcessPool (Issue #21)
|
||||
logger.error("BrokenProcessPool detected, recreating pool...")
|
||||
self._recreate_pool()
|
||||
self._cleanup_stale_processing()
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Worker error: {e}")
|
||||
traceback.print_exc()
|
||||
@@ -954,6 +1354,9 @@ class AnalysisWorker:
|
||||
self.db.connect()
|
||||
self._cleanup_stale_processing()
|
||||
self._retry_failed_tracks()
|
||||
# Also check if pool needs recreation
|
||||
if not self._check_pool_health():
|
||||
self._recreate_pool()
|
||||
except Exception as reconnect_err:
|
||||
logger.error(f"Recovery failed: {reconnect_err}")
|
||||
self.consecutive_empty = 0
|
||||
@@ -963,6 +1366,9 @@ class AnalysisWorker:
|
||||
if self.executor:
|
||||
self.executor.shutdown(wait=True)
|
||||
logger.info("Worker processes shut down")
|
||||
if self.pubsub:
|
||||
self.pubsub.close()
|
||||
logger.info("Control channel closed")
|
||||
self.db.close()
|
||||
logger.info("Worker stopped")
|
||||
|
||||
@@ -1143,9 +1549,18 @@ class AnalysisWorker:
|
||||
cursor.close()
|
||||
|
||||
def _save_failed(self, track_id: str, error: str):
|
||||
"""Mark track as failed and increment retry count"""
|
||||
"""Mark track as failed, increment retry count, and record in EnrichmentFailure table"""
|
||||
cursor = self.db.get_cursor()
|
||||
try:
|
||||
# Get track details for failure recording
|
||||
cursor.execute("""
|
||||
SELECT title, "filePath", "artistId"
|
||||
FROM "Track"
|
||||
WHERE id = %s
|
||||
""", (track_id,))
|
||||
track = cursor.fetchone()
|
||||
|
||||
# Update track status
|
||||
cursor.execute("""
|
||||
UPDATE "Track"
|
||||
SET
|
||||
@@ -1159,6 +1574,34 @@ class AnalysisWorker:
|
||||
result = cursor.fetchone()
|
||||
retry_count = result['analysisRetryCount'] if result else 0
|
||||
|
||||
# Record failure in EnrichmentFailure table for user visibility
|
||||
if track:
|
||||
cursor.execute("""
|
||||
INSERT INTO "EnrichmentFailure" (
|
||||
"entityType", "entityId", "entityName", "errorMessage",
|
||||
"lastFailedAt", "retryCount", metadata
|
||||
) VALUES (%s, %s, %s, %s, NOW(), 1, %s)
|
||||
ON CONFLICT ("entityType", "entityId")
|
||||
DO UPDATE SET
|
||||
"errorMessage" = EXCLUDED."errorMessage",
|
||||
"lastFailedAt" = NOW(),
|
||||
"retryCount" = "EnrichmentFailure"."retryCount" + 1,
|
||||
metadata = EXCLUDED.metadata,
|
||||
resolved = false,
|
||||
skipped = false
|
||||
""", (
|
||||
'audio',
|
||||
track_id,
|
||||
track.get('title', 'Unknown Track'),
|
||||
error[:500],
|
||||
Json({
|
||||
'filePath': track.get('filePath'),
|
||||
'artistId': track.get('artistId'),
|
||||
'retryCount': retry_count,
|
||||
'maxRetries': MAX_RETRIES
|
||||
})
|
||||
))
|
||||
|
||||
if retry_count >= MAX_RETRIES:
|
||||
logger.warning(f"Track {track_id} has permanently failed after {retry_count} attempts")
|
||||
else:
|
||||
@@ -1167,6 +1610,7 @@ class AnalysisWorker:
|
||||
self.db.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to mark track as failed: {e}")
|
||||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||||
self.db.rollback()
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
Reference in New Issue
Block a user