mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Use -o - (stdout) instead of -o /dev/null for audio output, as dsd-fme expects specific output targets. Remove -N flag which may cause issues in headless mode. Add stderr capture on pipeline failure for better error messages. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
389 lines
11 KiB
Python
389 lines
11 KiB
Python
"""DMR / P25 / Digital Voice decoding routes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import queue
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Generator, Optional
|
|
|
|
from flask import Blueprint, jsonify, request, Response
|
|
|
|
import app as app_module
|
|
from utils.logging import get_logger
|
|
from utils.sse import format_sse
|
|
from utils.constants import (
|
|
SSE_QUEUE_TIMEOUT,
|
|
SSE_KEEPALIVE_INTERVAL,
|
|
QUEUE_MAX_SIZE,
|
|
)
|
|
|
|
logger = get_logger('intercept.dmr')
|
|
|
|
dmr_bp = Blueprint('dmr', __name__, url_prefix='/dmr')
|
|
|
|
# ============================================
|
|
# GLOBAL STATE
|
|
# ============================================
|
|
|
|
dmr_rtl_process: Optional[subprocess.Popen] = None
|
|
dmr_dsd_process: Optional[subprocess.Popen] = None
|
|
dmr_thread: Optional[threading.Thread] = None
|
|
dmr_running = False
|
|
dmr_lock = threading.Lock()
|
|
dmr_queue: queue.Queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
|
|
dmr_active_device: Optional[int] = None
|
|
|
|
VALID_PROTOCOLS = ['auto', 'dmr', 'p25', 'nxdn', 'dstar', 'provoice']
|
|
|
|
# Classic dsd flags
|
|
_DSD_PROTOCOL_FLAGS = {
|
|
'auto': [],
|
|
'dmr': ['-fd'],
|
|
'p25': ['-fp'],
|
|
'nxdn': ['-fn'],
|
|
'dstar': ['-fi'],
|
|
'provoice': ['-fv'],
|
|
}
|
|
|
|
# dsd-fme uses different flag names
|
|
_DSD_FME_PROTOCOL_FLAGS = {
|
|
'auto': ['-ft'],
|
|
'dmr': ['-fs'],
|
|
'p25': ['-f1'],
|
|
'nxdn': ['-fi'],
|
|
'dstar': [],
|
|
'provoice': ['-fp'],
|
|
}
|
|
|
|
# ============================================
|
|
# HELPERS
|
|
# ============================================
|
|
|
|
|
|
def find_dsd() -> tuple[str | None, bool]:
|
|
"""Find DSD (Digital Speech Decoder) binary.
|
|
|
|
Checks for dsd-fme first (common fork), then falls back to dsd.
|
|
Returns (path, is_fme) tuple.
|
|
"""
|
|
path = shutil.which('dsd-fme')
|
|
if path:
|
|
return path, True
|
|
path = shutil.which('dsd')
|
|
if path:
|
|
return path, False
|
|
return None, False
|
|
|
|
|
|
def find_rtl_fm() -> str | None:
|
|
"""Find rtl_fm binary."""
|
|
return shutil.which('rtl_fm')
|
|
|
|
|
|
def parse_dsd_output(line: str) -> dict | None:
|
|
"""Parse a line of DSD stderr output into a structured event."""
|
|
line = line.strip()
|
|
if not line:
|
|
return None
|
|
|
|
# Sync detection: "Sync: +DMR (data)" or "Sync: +P25 Phase 1"
|
|
sync_match = re.match(r'Sync:\s*\+?(\S+.*)', line)
|
|
if sync_match:
|
|
return {
|
|
'type': 'sync',
|
|
'protocol': sync_match.group(1).strip(),
|
|
'timestamp': datetime.now().strftime('%H:%M:%S'),
|
|
}
|
|
|
|
# Talkgroup and Source: "TG: 12345 Src: 67890"
|
|
tg_match = re.match(r'.*TG:\s*(\d+)\s+Src:\s*(\d+)', line)
|
|
if tg_match:
|
|
return {
|
|
'type': 'call',
|
|
'talkgroup': int(tg_match.group(1)),
|
|
'source_id': int(tg_match.group(2)),
|
|
'timestamp': datetime.now().strftime('%H:%M:%S'),
|
|
}
|
|
|
|
# Slot info: "Slot 1" or "Slot 2"
|
|
slot_match = re.match(r'.*Slot\s*(\d+)', line)
|
|
if slot_match:
|
|
return {
|
|
'type': 'slot',
|
|
'slot': int(slot_match.group(1)),
|
|
'timestamp': datetime.now().strftime('%H:%M:%S'),
|
|
}
|
|
|
|
# DMR voice frame
|
|
if 'Voice' in line or 'voice' in line:
|
|
return {
|
|
'type': 'voice',
|
|
'detail': line,
|
|
'timestamp': datetime.now().strftime('%H:%M:%S'),
|
|
}
|
|
|
|
# P25 NAC (Network Access Code)
|
|
nac_match = re.match(r'.*NAC:\s*(\w+)', line)
|
|
if nac_match:
|
|
return {
|
|
'type': 'nac',
|
|
'nac': nac_match.group(1),
|
|
'timestamp': datetime.now().strftime('%H:%M:%S'),
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Popen):
|
|
"""Read DSD stderr output and push parsed events to the queue."""
|
|
global dmr_running
|
|
|
|
try:
|
|
dmr_queue.put_nowait({'type': 'status', 'text': 'started'})
|
|
|
|
while dmr_running:
|
|
if dsd_process.poll() is not None:
|
|
break
|
|
|
|
line = dsd_process.stderr.readline()
|
|
if not line:
|
|
if dsd_process.poll() is not None:
|
|
break
|
|
continue
|
|
|
|
text = line.decode('utf-8', errors='replace').strip()
|
|
if not text:
|
|
continue
|
|
|
|
parsed = parse_dsd_output(text)
|
|
if parsed:
|
|
try:
|
|
dmr_queue.put_nowait(parsed)
|
|
except queue.Full:
|
|
try:
|
|
dmr_queue.get_nowait()
|
|
except queue.Empty:
|
|
pass
|
|
try:
|
|
dmr_queue.put_nowait(parsed)
|
|
except queue.Full:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error(f"DSD stream error: {e}")
|
|
finally:
|
|
dmr_running = False
|
|
try:
|
|
dmr_queue.put_nowait({'type': 'status', 'text': 'stopped'})
|
|
except queue.Full:
|
|
pass
|
|
logger.info("DSD stream thread stopped")
|
|
|
|
|
|
# ============================================
|
|
# API ENDPOINTS
|
|
# ============================================
|
|
|
|
@dmr_bp.route('/tools')
|
|
def check_tools() -> Response:
|
|
"""Check for required tools."""
|
|
dsd_path, _ = find_dsd()
|
|
rtl_fm = find_rtl_fm()
|
|
return jsonify({
|
|
'dsd': dsd_path is not None,
|
|
'rtl_fm': rtl_fm is not None,
|
|
'available': dsd_path is not None and rtl_fm is not None,
|
|
'protocols': VALID_PROTOCOLS,
|
|
})
|
|
|
|
|
|
@dmr_bp.route('/start', methods=['POST'])
|
|
def start_dmr() -> Response:
|
|
"""Start digital voice decoding."""
|
|
global dmr_rtl_process, dmr_dsd_process, dmr_thread, dmr_running, dmr_active_device
|
|
|
|
with dmr_lock:
|
|
if dmr_running:
|
|
return jsonify({'status': 'error', 'message': 'Already running'}), 409
|
|
|
|
dsd_path, is_fme = find_dsd()
|
|
if not dsd_path:
|
|
return jsonify({'status': 'error', 'message': 'dsd not found. Install dsd-fme or dsd.'}), 503
|
|
|
|
rtl_fm_path = find_rtl_fm()
|
|
if not rtl_fm_path:
|
|
return jsonify({'status': 'error', 'message': 'rtl_fm not found. Install rtl-sdr tools.'}), 503
|
|
|
|
data = request.json or {}
|
|
|
|
try:
|
|
frequency = float(data.get('frequency', 462.5625))
|
|
gain = int(data.get('gain', 40))
|
|
device = int(data.get('device', 0))
|
|
protocol = str(data.get('protocol', 'auto')).lower()
|
|
except (ValueError, TypeError) as e:
|
|
return jsonify({'status': 'error', 'message': f'Invalid parameter: {e}'}), 400
|
|
|
|
if frequency <= 0:
|
|
return jsonify({'status': 'error', 'message': 'Frequency must be positive'}), 400
|
|
|
|
if protocol not in VALID_PROTOCOLS:
|
|
return jsonify({'status': 'error', 'message': f'Invalid protocol. Use: {", ".join(VALID_PROTOCOLS)}'}), 400
|
|
|
|
# Clear stale queue
|
|
try:
|
|
while True:
|
|
dmr_queue.get_nowait()
|
|
except queue.Empty:
|
|
pass
|
|
|
|
# Claim SDR device
|
|
error = app_module.claim_sdr_device(device, 'dmr')
|
|
if error:
|
|
return jsonify({'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error}), 409
|
|
|
|
dmr_active_device = device
|
|
|
|
freq_hz = int(frequency * 1e6)
|
|
|
|
# Build rtl_fm command (48kHz sample rate for DSD)
|
|
rtl_cmd = [
|
|
rtl_fm_path,
|
|
'-M', 'fm',
|
|
'-f', str(freq_hz),
|
|
'-s', '48000',
|
|
'-g', str(gain),
|
|
'-d', str(device),
|
|
'-l', '1', # squelch level
|
|
]
|
|
|
|
# Build DSD command
|
|
# Use -o - to send decoded audio to stdout (piped to DEVNULL)
|
|
# instead of PulseAudio which may not be available under sudo
|
|
dsd_cmd = [dsd_path, '-i', '-', '-o', '-']
|
|
if is_fme:
|
|
dsd_cmd.extend(_DSD_FME_PROTOCOL_FLAGS.get(protocol, []))
|
|
else:
|
|
dsd_cmd.extend(_DSD_PROTOCOL_FLAGS.get(protocol, []))
|
|
|
|
try:
|
|
dmr_rtl_process = subprocess.Popen(
|
|
rtl_cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
dmr_dsd_process = subprocess.Popen(
|
|
dsd_cmd,
|
|
stdin=dmr_rtl_process.stdout,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
# Allow rtl_fm to send directly to dsd
|
|
dmr_rtl_process.stdout.close()
|
|
|
|
time.sleep(0.3)
|
|
|
|
rtl_rc = dmr_rtl_process.poll()
|
|
dsd_rc = dmr_dsd_process.poll()
|
|
if rtl_rc is not None or dsd_rc is not None:
|
|
# Process died — capture stderr for diagnostics
|
|
dsd_err = ''
|
|
if dmr_dsd_process.stderr:
|
|
dsd_err = dmr_dsd_process.stderr.read().decode('utf-8', errors='replace')[:500]
|
|
logger.error(f"DSD pipeline died: rtl_fm rc={rtl_rc}, dsd rc={dsd_rc}, dsd stderr={dsd_err!r}")
|
|
if dmr_active_device is not None:
|
|
app_module.release_sdr_device(dmr_active_device)
|
|
dmr_active_device = None
|
|
msg = 'Failed to start DSD pipeline'
|
|
if dsd_err:
|
|
msg += f': {dsd_err.strip()}'
|
|
return jsonify({'status': 'error', 'message': msg}), 500
|
|
|
|
dmr_running = True
|
|
dmr_thread = threading.Thread(
|
|
target=stream_dsd_output,
|
|
args=(dmr_rtl_process, dmr_dsd_process),
|
|
daemon=True,
|
|
)
|
|
dmr_thread.start()
|
|
|
|
return jsonify({
|
|
'status': 'started',
|
|
'frequency': frequency,
|
|
'protocol': protocol,
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start DMR: {e}")
|
|
if dmr_active_device is not None:
|
|
app_module.release_sdr_device(dmr_active_device)
|
|
dmr_active_device = None
|
|
return jsonify({'status': 'error', 'message': str(e)}), 500
|
|
|
|
|
|
@dmr_bp.route('/stop', methods=['POST'])
|
|
def stop_dmr() -> Response:
|
|
"""Stop digital voice decoding."""
|
|
global dmr_rtl_process, dmr_dsd_process, dmr_running, dmr_active_device
|
|
|
|
dmr_running = False
|
|
|
|
for proc in [dmr_dsd_process, dmr_rtl_process]:
|
|
if proc and proc.poll() is None:
|
|
try:
|
|
proc.terminate()
|
|
proc.wait(timeout=2)
|
|
except Exception:
|
|
try:
|
|
proc.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
dmr_rtl_process = None
|
|
dmr_dsd_process = None
|
|
|
|
if dmr_active_device is not None:
|
|
app_module.release_sdr_device(dmr_active_device)
|
|
dmr_active_device = None
|
|
|
|
return jsonify({'status': 'stopped'})
|
|
|
|
|
|
@dmr_bp.route('/status')
|
|
def dmr_status() -> Response:
|
|
"""Get DMR decoder status."""
|
|
return jsonify({
|
|
'running': dmr_running,
|
|
'device': dmr_active_device,
|
|
})
|
|
|
|
|
|
@dmr_bp.route('/stream')
|
|
def stream_dmr() -> Response:
|
|
"""SSE stream for DMR decoder events."""
|
|
def generate() -> Generator[str, None, None]:
|
|
last_keepalive = time.time()
|
|
while True:
|
|
try:
|
|
msg = dmr_queue.get(timeout=SSE_QUEUE_TIMEOUT)
|
|
last_keepalive = time.time()
|
|
yield format_sse(msg)
|
|
except queue.Empty:
|
|
now = time.time()
|
|
if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL:
|
|
yield format_sse({'type': 'keepalive'})
|
|
last_keepalive = now
|
|
|
|
response = Response(generate(), mimetype='text/event-stream')
|
|
response.headers['Cache-Control'] = 'no-cache'
|
|
response.headers['X-Accel-Buffering'] = 'no'
|
|
return response
|