fix: SDR device registry collision with multiple SDR types

The registry used plain int keys (device index), so HackRF at index 0
and RTL-SDR at index 0 would collide. Changed to composite string keys
("sdr_type:index") so each SDR type+index pair is tracked independently.
Updated all route callers, frontend device selectors, and session restore.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-27 09:06:41 +00:00
parent 0d13638d70
commit 5aa68a49c6
17 changed files with 907 additions and 786 deletions

46
app.py
View File

@@ -257,12 +257,12 @@ cleanup_manager.register(deauth_alerts)
# SDR DEVICE REGISTRY
# ============================================
# Tracks which mode is using which SDR device to prevent conflicts
# Key: device_index (int), Value: mode_name (str)
sdr_device_registry: dict[int, str] = {}
# Key: "sdr_type:device_index" (str), Value: mode_name (str)
sdr_device_registry: dict[str, str] = {}
sdr_device_registry_lock = threading.Lock()
def claim_sdr_device(device_index: int, mode_name: str) -> str | None:
def claim_sdr_device(device_index: int, mode_name: str, sdr_type: str = 'rtlsdr') -> str | None:
"""Claim an SDR device for a mode.
Checks the in-app registry first, then probes the USB device to
@@ -272,43 +272,48 @@ def claim_sdr_device(device_index: int, mode_name: str) -> str | None:
Args:
device_index: The SDR device index to claim
mode_name: Name of the mode claiming the device (e.g., 'sensor', 'rtlamr')
sdr_type: SDR type string (e.g., 'rtlsdr', 'hackrf', 'limesdr')
Returns:
Error message if device is in use, None if successfully claimed
"""
key = f"{sdr_type}:{device_index}"
with sdr_device_registry_lock:
if device_index in sdr_device_registry:
in_use_by = sdr_device_registry[device_index]
return f'SDR device {device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.'
if key in sdr_device_registry:
in_use_by = sdr_device_registry[key]
return f'SDR device {sdr_type}:{device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.'
# Probe the USB device to catch external processes holding the handle
try:
from utils.sdr.detection import probe_rtlsdr_device
usb_error = probe_rtlsdr_device(device_index)
if usb_error:
return usb_error
except Exception:
pass # If probe fails, let the caller proceed normally
if sdr_type == 'rtlsdr':
try:
from utils.sdr.detection import probe_rtlsdr_device
usb_error = probe_rtlsdr_device(device_index)
if usb_error:
return usb_error
except Exception:
pass # If probe fails, let the caller proceed normally
sdr_device_registry[device_index] = mode_name
sdr_device_registry[key] = mode_name
return None
def release_sdr_device(device_index: int) -> None:
def release_sdr_device(device_index: int, sdr_type: str = 'rtlsdr') -> None:
"""Release an SDR device from the registry.
Args:
device_index: The SDR device index to release
sdr_type: SDR type string (e.g., 'rtlsdr', 'hackrf', 'limesdr')
"""
key = f"{sdr_type}:{device_index}"
with sdr_device_registry_lock:
sdr_device_registry.pop(device_index, None)
sdr_device_registry.pop(key, None)
def get_sdr_device_status() -> dict[int, str]:
def get_sdr_device_status() -> dict[str, str]:
"""Get current SDR device allocations.
Returns:
Dictionary mapping device indices to mode names
Dictionary mapping 'sdr_type:device_index' keys to mode names
"""
with sdr_device_registry_lock:
return dict(sdr_device_registry)
@@ -429,8 +434,9 @@ def get_devices_status() -> Response:
result = []
for device in devices:
d = device.to_dict()
d['in_use'] = device.index in registry
d['used_by'] = registry.get(device.index)
key = f"{device.sdr_type.value}:{device.index}"
d['in_use'] = key in registry
d['used_by'] = registry.get(key)
result.append(d)
return jsonify(result)

View File

@@ -21,7 +21,7 @@ import app as app_module
from utils.logging import sensor_logger as logger
from utils.validation import validate_device_index, validate_gain, validate_ppm
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
from utils.constants import (
PROCESS_TERMINATE_TIMEOUT,
@@ -45,6 +45,7 @@ acars_last_message_time = None
# Track which device is being used
acars_active_device: int | None = None
acars_active_sdr_type: str | None = None
def find_acarsdec():
@@ -151,7 +152,7 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) -
logger.error(f"ACARS stream error: {e}")
app_module.acars_queue.put({'type': 'error', 'message': str(e)})
finally:
global acars_active_device
global acars_active_device, acars_active_sdr_type
# Ensure process is terminated
try:
process.terminate()
@@ -167,8 +168,9 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) -
app_module.acars_process = None
# Release SDR device
if acars_active_device is not None:
app_module.release_sdr_device(acars_active_device)
app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr')
acars_active_device = None
acars_active_sdr_type = None
@acars_bp.route('/tools')
@@ -200,7 +202,7 @@ def acars_status() -> Response:
@acars_bp.route('/start', methods=['POST'])
def start_acars() -> Response:
"""Start ACARS decoder."""
global acars_message_count, acars_last_message_time, acars_active_device
global acars_message_count, acars_last_message_time, acars_active_device, acars_active_sdr_type
with app_module.acars_lock:
if app_module.acars_process and app_module.acars_process.poll() is None:
@@ -227,9 +229,12 @@ def start_acars() -> Response:
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
# Resolve SDR type for device selection
sdr_type_str = data.get('sdr_type', 'rtlsdr')
# Check if device is available
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'acars')
error = app_module.claim_sdr_device(device_int, 'acars', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -238,6 +243,7 @@ def start_acars() -> Response:
}), 409
acars_active_device = device_int
acars_active_sdr_type = sdr_type_str
# Get frequencies - use provided or defaults
frequencies = data.get('frequencies', DEFAULT_ACARS_FREQUENCIES)
@@ -255,8 +261,6 @@ def start_acars() -> Response:
acars_message_count = 0
acars_last_message_time = None
# Resolve SDR type for device selection
sdr_type_str = data.get('sdr_type', 'rtlsdr')
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
@@ -343,8 +347,9 @@ def start_acars() -> Response:
if process.poll() is not None:
# Process died - release device
if acars_active_device is not None:
app_module.release_sdr_device(acars_active_device)
app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr')
acars_active_device = None
acars_active_sdr_type = None
stderr = ''
if process.stderr:
stderr = process.stderr.read().decode('utf-8', errors='replace')
@@ -375,8 +380,9 @@ def start_acars() -> Response:
except Exception as e:
# Release device on failure
if acars_active_device is not None:
app_module.release_sdr_device(acars_active_device)
app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr')
acars_active_device = None
acars_active_sdr_type = None
logger.error(f"Failed to start ACARS decoder: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@@ -384,7 +390,7 @@ def start_acars() -> Response:
@acars_bp.route('/stop', methods=['POST'])
def stop_acars() -> Response:
"""Stop ACARS decoder."""
global acars_active_device
global acars_active_device, acars_active_sdr_type
with app_module.acars_lock:
if not app_module.acars_process:
@@ -405,31 +411,32 @@ def stop_acars() -> Response:
# Release device from registry
if acars_active_device is not None:
app_module.release_sdr_device(acars_active_device)
app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr')
acars_active_device = None
acars_active_sdr_type = None
return jsonify({'status': 'stopped'})
@acars_bp.route('/stream')
def stream_acars() -> Response:
"""SSE stream for ACARS messages."""
def _on_msg(msg: dict[str, Any]) -> None:
process_event('acars', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=app_module.acars_queue,
channel_key='acars',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
on_message=_on_msg,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
@acars_bp.route('/stream')
def stream_acars() -> Response:
"""SSE stream for ACARS messages."""
def _on_msg(msg: dict[str, Any]) -> None:
process_event('acars', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=app_module.acars_queue,
channel_key='acars',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
on_message=_on_msg,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
@acars_bp.route('/frequencies')

View File

@@ -72,6 +72,7 @@ adsb_last_message_time = None
adsb_bytes_received = 0
adsb_lines_received = 0
adsb_active_device = None # Track which device index is being used
adsb_active_sdr_type: str | None = None
_sbs_error_logged = False # Suppress repeated connection error logs
# Track ICAOs already looked up in aircraft database (avoid repeated lookups)
@@ -674,7 +675,7 @@ def adsb_session():
@adsb_bp.route('/start', methods=['POST'])
def start_adsb():
"""Start ADS-B tracking."""
global adsb_using_service, adsb_active_device
global adsb_using_service, adsb_active_device, adsb_active_sdr_type
with app_module.adsb_lock:
if adsb_using_service:
@@ -787,7 +788,7 @@ def start_adsb():
# Check if device is available before starting local dump1090
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'adsb')
error = app_module.claim_sdr_device(device_int, 'adsb', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -825,7 +826,7 @@ def start_adsb():
if app_module.adsb_process.poll() is not None:
# Process exited - release device and get error message
app_module.release_sdr_device(device_int)
app_module.release_sdr_device(device_int, sdr_type_str)
stderr_output = ''
if app_module.adsb_process.stderr:
try:
@@ -872,6 +873,7 @@ def start_adsb():
adsb_using_service = True
adsb_active_device = device # Track which device is being used
adsb_active_sdr_type = sdr_type_str
thread = threading.Thread(target=parse_sbs_stream, args=(f'localhost:{ADSB_SBS_PORT}',), daemon=True)
thread.start()
@@ -891,14 +893,14 @@ def start_adsb():
})
except Exception as e:
# Release device on failure
app_module.release_sdr_device(device_int)
app_module.release_sdr_device(device_int, sdr_type_str)
return jsonify({'status': 'error', 'message': str(e)})
@adsb_bp.route('/stop', methods=['POST'])
def stop_adsb():
"""Stop ADS-B tracking."""
global adsb_using_service, adsb_active_device
global adsb_using_service, adsb_active_device, adsb_active_sdr_type
data = request.get_json(silent=True) or {}
stop_source = data.get('source')
stopped_by = request.remote_addr
@@ -923,10 +925,11 @@ def stop_adsb():
# Release device from registry
if adsb_active_device is not None:
app_module.release_sdr_device(adsb_active_device)
app_module.release_sdr_device(adsb_active_device, adsb_active_sdr_type or 'rtlsdr')
adsb_using_service = False
adsb_active_device = None
adsb_active_sdr_type = None
app_module.adsb_aircraft.clear()
_looked_up_icaos.clear()

View File

@@ -44,6 +44,7 @@ ais_connected = False
ais_messages_received = 0
ais_last_message_time = None
ais_active_device = None
ais_active_sdr_type: str | None = None
_ais_error_logged = True
# Common installation paths for AIS-catcher
@@ -350,7 +351,7 @@ def ais_status():
@ais_bp.route('/start', methods=['POST'])
def start_ais():
"""Start AIS tracking."""
global ais_running, ais_active_device
global ais_running, ais_active_device, ais_active_sdr_type
with app_module.ais_lock:
if ais_running:
@@ -397,7 +398,7 @@ def start_ais():
# Check if device is available
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'ais')
error = app_module.claim_sdr_device(device_int, 'ais', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -436,7 +437,7 @@ def start_ais():
if app_module.ais_process.poll() is not None:
# Release device on failure
app_module.release_sdr_device(device_int)
app_module.release_sdr_device(device_int, sdr_type_str)
stderr_output = ''
if app_module.ais_process.stderr:
try:
@@ -450,6 +451,7 @@ def start_ais():
ais_running = True
ais_active_device = device
ais_active_sdr_type = sdr_type_str
# Start TCP parser thread
thread = threading.Thread(target=parse_ais_stream, args=(tcp_port,), daemon=True)
@@ -463,7 +465,7 @@ def start_ais():
})
except Exception as e:
# Release device on failure
app_module.release_sdr_device(device_int)
app_module.release_sdr_device(device_int, sdr_type_str)
logger.error(f"Failed to start AIS-catcher: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@@ -471,7 +473,7 @@ def start_ais():
@ais_bp.route('/stop', methods=['POST'])
def stop_ais():
"""Stop AIS tracking."""
global ais_running, ais_active_device
global ais_running, ais_active_device, ais_active_sdr_type
with app_module.ais_lock:
if app_module.ais_process:
@@ -490,10 +492,11 @@ def stop_ais():
# Release device from registry
if ais_active_device is not None:
app_module.release_sdr_device(ais_active_device)
app_module.release_sdr_device(ais_active_device, ais_active_sdr_type or 'rtlsdr')
ais_running = False
ais_active_device = None
ais_active_sdr_type = None
app_module.ais_vessels.clear()
return jsonify({'status': 'stopped'})

View File

@@ -37,6 +37,7 @@ aprs_bp = Blueprint('aprs', __name__, url_prefix='/aprs')
# Track which SDR device is being used
aprs_active_device: int | None = None
aprs_active_sdr_type: str | None = None
# APRS frequencies by region (MHz)
APRS_FREQUENCIES = {
@@ -1454,7 +1455,7 @@ def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_pr
- type='meter': Audio level meter readings (rate-limited)
"""
global aprs_packet_count, aprs_station_count, aprs_last_packet_time, aprs_stations
global _last_meter_time, _last_meter_level, aprs_active_device
global _last_meter_time, _last_meter_level, aprs_active_device, aprs_active_sdr_type
# Capture the device claimed by THIS session so the finally block only
# releases our own device, not one claimed by a subsequent start.
@@ -1588,8 +1589,9 @@ def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_pr
pass
# Release SDR device — only if it's still ours (not reclaimed by a new start)
if my_device is not None and aprs_active_device == my_device:
app_module.release_sdr_device(my_device)
app_module.release_sdr_device(my_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
aprs_active_sdr_type = None
@aprs_bp.route('/tools')
@@ -1658,7 +1660,7 @@ def aprs_data() -> Response:
def start_aprs() -> Response:
"""Start APRS decoder."""
global aprs_packet_count, aprs_station_count, aprs_last_packet_time, aprs_stations
global aprs_active_device
global aprs_active_device, aprs_active_sdr_type
with app_module.aprs_lock:
if app_module.aprs_process and app_module.aprs_process.poll() is None:
@@ -1707,7 +1709,7 @@ def start_aprs() -> Response:
}), 400
# Reserve SDR device to prevent conflicts with other modes
error = app_module.claim_sdr_device(device, 'aprs')
error = app_module.claim_sdr_device(device, 'aprs', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -1715,6 +1717,7 @@ def start_aprs() -> Response:
'message': error
}), 409
aprs_active_device = device
aprs_active_sdr_type = sdr_type_str
# Get frequency for region
region = data.get('region', 'north_america')
@@ -1756,8 +1759,9 @@ def start_aprs() -> Response:
rtl_cmd = rtl_cmd[:-1] + ['-E', 'dc', '-A', 'fast', '-']
except Exception as e:
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device)
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
aprs_active_sdr_type = None
return jsonify({'status': 'error', 'message': f'Failed to build SDR command: {e}'}), 500
# Build decoder command
@@ -1859,8 +1863,9 @@ def start_aprs() -> Response:
except Exception:
pass
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device)
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
aprs_active_sdr_type = None
return jsonify({'status': 'error', 'message': error_msg}), 500
if decoder_process.poll() is not None:
@@ -1886,8 +1891,9 @@ def start_aprs() -> Response:
except Exception:
pass
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device)
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
aprs_active_sdr_type = None
return jsonify({'status': 'error', 'message': error_msg}), 500
# Store references for status checks and cleanup
@@ -1915,15 +1921,16 @@ def start_aprs() -> Response:
except Exception as e:
logger.error(f"Failed to start APRS decoder: {e}")
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device)
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
aprs_active_sdr_type = None
return jsonify({'status': 'error', 'message': str(e)}), 500
@aprs_bp.route('/stop', methods=['POST'])
def stop_aprs() -> Response:
"""Stop APRS decoder."""
global aprs_active_device
global aprs_active_device, aprs_active_sdr_type
with app_module.aprs_lock:
processes_to_stop = []
@@ -1963,8 +1970,9 @@ def stop_aprs() -> Response:
# Release SDR device
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device)
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
aprs_active_sdr_type = None
return jsonify({'status': 'stopped'})

View File

@@ -51,6 +51,7 @@ dsc_running = False
# Track which device is being used
dsc_active_device: int | None = None
dsc_active_sdr_type: str | None = None
def _get_dsc_decoder_path() -> str | None:
@@ -171,7 +172,7 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non
'error': str(e)
})
finally:
global dsc_active_device
global dsc_active_device, dsc_active_sdr_type
try:
os.close(master_fd)
except OSError:
@@ -197,8 +198,9 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non
app_module.dsc_rtl_process = None
# Release SDR device
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)
app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr')
dsc_active_device = None
dsc_active_sdr_type = None
def _store_critical_alert(msg: dict) -> None:
@@ -331,10 +333,13 @@ def start_decoding() -> Response:
'message': str(e)
}), 400
# Get SDR type from request
sdr_type_str = data.get('sdr_type', 'rtlsdr')
# Check if device is available using centralized registry
global dsc_active_device
global dsc_active_device, dsc_active_sdr_type
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'dsc')
error = app_module.claim_sdr_device(device_int, 'dsc', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -343,6 +348,7 @@ def start_decoding() -> Response:
}), 409
dsc_active_device = device_int
dsc_active_sdr_type = sdr_type_str
# Clear queue
while not app_module.dsc_queue.empty():
@@ -440,8 +446,9 @@ def start_decoding() -> Response:
pass
# Release device on failure
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)
app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr')
dsc_active_device = None
dsc_active_sdr_type = None
return jsonify({
'status': 'error',
'message': f'Tool not found: {e.filename}'
@@ -458,8 +465,9 @@ def start_decoding() -> Response:
pass
# Release device on failure
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)
app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr')
dsc_active_device = None
dsc_active_sdr_type = None
logger.error(f"Failed to start DSC decoder: {e}")
return jsonify({
'status': 'error',
@@ -470,7 +478,7 @@ def start_decoding() -> Response:
@dsc_bp.route('/stop', methods=['POST'])
def stop_decoding() -> Response:
"""Stop DSC decoder."""
global dsc_running, dsc_active_device
global dsc_running, dsc_active_device, dsc_active_sdr_type
with app_module.dsc_lock:
if not app_module.dsc_process:
@@ -509,8 +517,9 @@ def stop_decoding() -> Response:
# Release device from registry
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)
app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr')
dsc_active_device = None
dsc_active_sdr_type = None
return jsonify({'status': 'stopped'})

View File

@@ -55,7 +55,9 @@ scanner_lock = threading.Lock()
scanner_paused = False
scanner_current_freq = 0.0
scanner_active_device: Optional[int] = None
scanner_active_sdr_type: str = 'rtlsdr'
receiver_active_device: Optional[int] = None
receiver_active_sdr_type: str = 'rtlsdr'
scanner_power_process: Optional[subprocess.Popen] = None
scanner_config = {
'start_freq': 88.0,
@@ -996,7 +998,7 @@ def check_tools() -> Response:
@receiver_bp.route('/scanner/start', methods=['POST'])
def start_scanner() -> Response:
"""Start the frequency scanner."""
global scanner_thread, scanner_running, scanner_config, scanner_active_device, receiver_active_device
global scanner_thread, scanner_running, scanner_config, scanner_active_device, scanner_active_sdr_type, receiver_active_device, receiver_active_sdr_type
with scanner_lock:
if scanner_running:
@@ -1063,10 +1065,11 @@ def start_scanner() -> Response:
}), 503
# Release listening device if active
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type)
receiver_active_device = None
receiver_active_sdr_type = 'rtlsdr'
# Claim device for scanner
error = app_module.claim_sdr_device(scanner_config['device'], 'scanner')
error = app_module.claim_sdr_device(scanner_config['device'], 'scanner', scanner_config['sdr_type'])
if error:
return jsonify({
'status': 'error',
@@ -1074,6 +1077,7 @@ def start_scanner() -> Response:
'message': error
}), 409
scanner_active_device = scanner_config['device']
scanner_active_sdr_type = scanner_config['sdr_type']
scanner_running = True
scanner_thread = threading.Thread(target=scanner_loop_power, daemon=True)
scanner_thread.start()
@@ -1091,9 +1095,10 @@ def start_scanner() -> Response:
'message': f'rx_fm not found. Install SoapySDR utilities for {sdr_type}.'
}), 503
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type)
receiver_active_device = None
error = app_module.claim_sdr_device(scanner_config['device'], 'scanner')
receiver_active_sdr_type = 'rtlsdr'
error = app_module.claim_sdr_device(scanner_config['device'], 'scanner', scanner_config['sdr_type'])
if error:
return jsonify({
'status': 'error',
@@ -1101,6 +1106,7 @@ def start_scanner() -> Response:
'message': error
}), 409
scanner_active_device = scanner_config['device']
scanner_active_sdr_type = scanner_config['sdr_type']
scanner_running = True
scanner_thread = threading.Thread(target=scanner_loop, daemon=True)
@@ -1115,7 +1121,7 @@ def start_scanner() -> Response:
@receiver_bp.route('/scanner/stop', methods=['POST'])
def stop_scanner() -> Response:
"""Stop the frequency scanner."""
global scanner_running, scanner_active_device, scanner_power_process
global scanner_running, scanner_active_device, scanner_active_sdr_type, scanner_power_process
scanner_running = False
_stop_audio_stream()
@@ -1130,8 +1136,9 @@ def stop_scanner() -> Response:
pass
scanner_power_process = None
if scanner_active_device is not None:
app_module.release_sdr_device(scanner_active_device)
app_module.release_sdr_device(scanner_active_device, scanner_active_sdr_type)
scanner_active_device = None
scanner_active_sdr_type = 'rtlsdr'
return jsonify({'status': 'stopped'})
@@ -1296,7 +1303,7 @@ def get_presets() -> Response:
@receiver_bp.route('/audio/start', methods=['POST'])
def start_audio() -> Response:
"""Start audio at specific frequency (manual mode)."""
global scanner_running, scanner_active_device, receiver_active_device, scanner_power_process, scanner_thread
global scanner_running, scanner_active_device, scanner_active_sdr_type, receiver_active_device, receiver_active_sdr_type, scanner_power_process, scanner_thread
global audio_running, audio_frequency, audio_modulation, audio_source, audio_start_token
data = request.json or {}
@@ -1356,8 +1363,9 @@ def start_audio() -> Response:
if scanner_running:
scanner_running = False
if scanner_active_device is not None:
app_module.release_sdr_device(scanner_active_device)
app_module.release_sdr_device(scanner_active_device, scanner_active_sdr_type)
scanner_active_device = None
scanner_active_sdr_type = 'rtlsdr'
scanner_thread_ref = scanner_thread
scanner_proc_ref = scanner_power_process
scanner_power_process = None
@@ -1419,8 +1427,9 @@ def start_audio() -> Response:
audio_source = 'waterfall'
# Shared monitor uses the waterfall's existing SDR claim.
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type)
receiver_active_device = None
receiver_active_sdr_type = 'rtlsdr'
return jsonify({
'status': 'started',
'frequency': frequency,
@@ -1443,13 +1452,14 @@ def start_audio() -> Response:
# to give the USB device time to be fully released.
if receiver_active_device is None or receiver_active_device != device:
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type)
receiver_active_device = None
receiver_active_sdr_type = 'rtlsdr'
error = None
max_claim_attempts = 6
for attempt in range(max_claim_attempts):
error = app_module.claim_sdr_device(device, 'receiver')
error = app_module.claim_sdr_device(device, 'receiver', sdr_type)
if not error:
break
if attempt < max_claim_attempts - 1:
@@ -1466,6 +1476,7 @@ def start_audio() -> Response:
'message': error
}), 409
receiver_active_device = device
receiver_active_sdr_type = sdr_type
_start_audio_stream(
frequency,
@@ -1489,8 +1500,9 @@ def start_audio() -> Response:
# Avoid leaving a stale device claim after startup failure.
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type)
receiver_active_device = None
receiver_active_sdr_type = 'rtlsdr'
start_error = ''
for log_path in ('/tmp/rtl_fm_stderr.log', '/tmp/ffmpeg_stderr.log'):
@@ -1515,11 +1527,12 @@ def start_audio() -> Response:
@receiver_bp.route('/audio/stop', methods=['POST'])
def stop_audio() -> Response:
"""Stop audio."""
global receiver_active_device
global receiver_active_device, receiver_active_sdr_type
_stop_audio_stream()
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type)
receiver_active_device = None
receiver_active_sdr_type = 'rtlsdr'
return jsonify({'status': 'stopped'})
@@ -1825,6 +1838,7 @@ waterfall_running = False
waterfall_lock = threading.Lock()
waterfall_queue: queue.Queue = queue.Queue(maxsize=200)
waterfall_active_device: Optional[int] = None
waterfall_active_sdr_type: str = 'rtlsdr'
waterfall_config = {
'start_freq': 88.0,
'end_freq': 108.0,
@@ -2033,7 +2047,7 @@ def _waterfall_loop():
def _stop_waterfall_internal() -> None:
"""Stop the waterfall display and release resources."""
global waterfall_running, waterfall_process, waterfall_active_device
global waterfall_running, waterfall_process, waterfall_active_device, waterfall_active_sdr_type
waterfall_running = False
if waterfall_process and waterfall_process.poll() is None:
@@ -2048,14 +2062,15 @@ def _stop_waterfall_internal() -> None:
waterfall_process = None
if waterfall_active_device is not None:
app_module.release_sdr_device(waterfall_active_device)
app_module.release_sdr_device(waterfall_active_device, waterfall_active_sdr_type)
waterfall_active_device = None
waterfall_active_sdr_type = 'rtlsdr'
@receiver_bp.route('/waterfall/start', methods=['POST'])
def start_waterfall() -> Response:
"""Start the waterfall/spectrogram display."""
global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device
global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device, waterfall_active_sdr_type
with waterfall_lock:
if waterfall_running:
@@ -2101,11 +2116,12 @@ def start_waterfall() -> Response:
pass
# Claim SDR device
error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall')
error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall', 'rtlsdr')
if error:
return jsonify({'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error}), 409
waterfall_active_device = waterfall_config['device']
waterfall_active_sdr_type = 'rtlsdr'
waterfall_running = True
waterfall_thread = threading.Thread(target=_waterfall_loop, daemon=True)
waterfall_thread.start()

View File

@@ -51,6 +51,7 @@ class _FilteredQueue:
# Track which device is being used
morse_active_device: int | None = None
morse_active_sdr_type: str | None = None
# Runtime lifecycle state.
MORSE_IDLE = 'idle'
@@ -231,7 +232,7 @@ def _snapshot_live_resources() -> list[str]:
@morse_bp.route('/morse/start', methods=['POST'])
def start_morse() -> Response:
global morse_active_device, morse_decoder_worker, morse_stderr_worker
global morse_active_device, morse_active_sdr_type, morse_decoder_worker, morse_stderr_worker
global morse_stop_event, morse_control_queue, morse_runtime_config
global morse_last_error, morse_session_id
@@ -261,6 +262,8 @@ def start_morse() -> Response:
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
sdr_type_str = data.get('sdr_type', 'rtlsdr')
with app_module.morse_lock:
if morse_state in {MORSE_STARTING, MORSE_RUNNING, MORSE_STOPPING}:
return jsonify({
@@ -270,7 +273,7 @@ def start_morse() -> Response:
}), 409
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'morse')
error = app_module.claim_sdr_device(device_int, 'morse', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -279,6 +282,7 @@ def start_morse() -> Response:
}), 409
morse_active_device = device_int
morse_active_sdr_type = sdr_type_str
morse_last_error = ''
morse_session_id += 1
@@ -288,7 +292,6 @@ def start_morse() -> Response:
sample_rate = 22050
bias_t = _bool_value(data.get('bias_t', False), False)
sdr_type_str = data.get('sdr_type', 'rtlsdr')
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
@@ -408,7 +411,7 @@ def start_morse() -> Response:
for device_pos, candidate_device_index in enumerate(candidate_device_indices, start=1):
if candidate_device_index != active_device_index:
prev_device = active_device_index
claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse')
claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse', sdr_type_str)
if claim_error:
msg = f'{_device_label(candidate_device_index)} unavailable: {claim_error}'
attempt_errors.append(msg)
@@ -417,7 +420,7 @@ def start_morse() -> Response:
continue
if prev_device is not None:
app_module.release_sdr_device(prev_device)
app_module.release_sdr_device(prev_device, morse_active_sdr_type or 'rtlsdr')
active_device_index = candidate_device_index
with app_module.morse_lock:
morse_active_device = active_device_index
@@ -634,8 +637,9 @@ def start_morse() -> Response:
logger.error('Morse startup failed: %s', msg)
with app_module.morse_lock:
if morse_active_device is not None:
app_module.release_sdr_device(morse_active_device)
app_module.release_sdr_device(morse_active_device, morse_active_sdr_type or 'rtlsdr')
morse_active_device = None
morse_active_sdr_type = None
morse_last_error = msg
_set_state(MORSE_ERROR, msg)
_set_state(MORSE_IDLE, 'Idle')
@@ -675,8 +679,9 @@ def start_morse() -> Response:
)
with app_module.morse_lock:
if morse_active_device is not None:
app_module.release_sdr_device(morse_active_device)
app_module.release_sdr_device(morse_active_device, morse_active_sdr_type or 'rtlsdr')
morse_active_device = None
morse_active_sdr_type = None
morse_last_error = f'Tool not found: {e.filename}'
_set_state(MORSE_ERROR, morse_last_error)
_set_state(MORSE_IDLE, 'Idle')
@@ -692,8 +697,9 @@ def start_morse() -> Response:
)
with app_module.morse_lock:
if morse_active_device is not None:
app_module.release_sdr_device(morse_active_device)
app_module.release_sdr_device(morse_active_device, morse_active_sdr_type or 'rtlsdr')
morse_active_device = None
morse_active_sdr_type = None
morse_last_error = str(e)
_set_state(MORSE_ERROR, morse_last_error)
_set_state(MORSE_IDLE, 'Idle')
@@ -702,7 +708,7 @@ def start_morse() -> Response:
@morse_bp.route('/morse/stop', methods=['POST'])
def stop_morse() -> Response:
global morse_active_device, morse_decoder_worker, morse_stderr_worker
global morse_active_device, morse_active_sdr_type, morse_decoder_worker, morse_stderr_worker
global morse_stop_event, morse_control_queue
stop_started = time.perf_counter()
@@ -717,6 +723,7 @@ def stop_morse() -> Response:
stderr_thread = morse_stderr_worker or getattr(rtl_proc, '_stderr_thread', None)
control_queue = morse_control_queue or getattr(rtl_proc, '_control_queue', None)
active_device = morse_active_device
active_sdr_type = morse_active_sdr_type
if (
not rtl_proc
@@ -768,7 +775,7 @@ def stop_morse() -> Response:
_mark(f'stderr thread joined={stderr_joined}')
if active_device is not None:
app_module.release_sdr_device(active_device)
app_module.release_sdr_device(active_device, active_sdr_type or 'rtlsdr')
_mark(f'SDR device {active_device} released')
stop_ms = round((time.perf_counter() - stop_started) * 1000.0, 1)
@@ -782,6 +789,7 @@ def stop_morse() -> Response:
with app_module.morse_lock:
morse_active_device = None
morse_active_sdr_type = None
_set_state(MORSE_IDLE, 'Stopped', extra={
'stop_ms': stop_ms,
'cleanup_steps': cleanup_steps,

View File

@@ -24,7 +24,7 @@ from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import sse_stream_fanout
from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType, SDRValidationError
@@ -34,6 +34,7 @@ pager_bp = Blueprint('pager', __name__)
# Track which device is being used
pager_active_device: int | None = None
pager_active_sdr_type: str | None = None
def parse_multimon_output(line: str) -> dict[str, str] | None:
@@ -96,7 +97,7 @@ def parse_multimon_output(line: str) -> dict[str, str] | None:
return None
def log_message(msg: dict[str, Any]) -> None:
def log_message(msg: dict[str, Any]) -> None:
"""Log a message to file if logging is enabled."""
if not app_module.logging_enabled:
return
@@ -104,39 +105,39 @@ def log_message(msg: dict[str, Any]) -> None:
with open(app_module.log_file_path, 'a') as f:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {msg.get('protocol', 'UNKNOWN')} | {msg.get('address', '')} | {msg.get('message', '')}\n")
except Exception as e:
logger.error(f"Failed to log message: {e}")
def _encode_scope_waveform(samples: tuple[int, ...], window_size: int = 256) -> list[int]:
"""Compress recent PCM samples into a signed 8-bit waveform for SSE."""
if not samples:
return []
window = samples[-window_size:] if len(samples) > window_size else samples
waveform: list[int] = []
for sample in window:
# Convert int16 PCM to int8 range for lightweight transport.
packed = int(round(sample / 256))
waveform.append(max(-127, min(127, packed)))
return waveform
def audio_relay_thread(
rtl_stdout,
multimon_stdin,
output_queue: queue.Queue,
stop_event: threading.Event,
) -> None:
"""Relay audio from rtl_fm to multimon-ng while computing signal levels.
Reads raw 16-bit LE PCM from *rtl_stdout*, writes every chunk straight
through to *multimon_stdin*, and every ~100 ms pushes an RMS / peak scope
event plus a compact waveform sample onto *output_queue*.
"""
CHUNK = 4096 # bytes 2048 samples at 16-bit mono
INTERVAL = 0.1 # seconds between scope updates
last_scope = time.monotonic()
except Exception as e:
logger.error(f"Failed to log message: {e}")
def _encode_scope_waveform(samples: tuple[int, ...], window_size: int = 256) -> list[int]:
"""Compress recent PCM samples into a signed 8-bit waveform for SSE."""
if not samples:
return []
window = samples[-window_size:] if len(samples) > window_size else samples
waveform: list[int] = []
for sample in window:
# Convert int16 PCM to int8 range for lightweight transport.
packed = int(round(sample / 256))
waveform.append(max(-127, min(127, packed)))
return waveform
def audio_relay_thread(
rtl_stdout,
multimon_stdin,
output_queue: queue.Queue,
stop_event: threading.Event,
) -> None:
"""Relay audio from rtl_fm to multimon-ng while computing signal levels.
Reads raw 16-bit LE PCM from *rtl_stdout*, writes every chunk straight
through to *multimon_stdin*, and every ~100 ms pushes an RMS / peak scope
event plus a compact waveform sample onto *output_queue*.
"""
CHUNK = 4096 # bytes 2048 samples at 16-bit mono
INTERVAL = 0.1 # seconds between scope updates
last_scope = time.monotonic()
try:
while not stop_event.is_set():
@@ -160,16 +161,16 @@ def audio_relay_thread(
if n_samples == 0:
continue
samples = struct.unpack(f'<{n_samples}h', data[:n_samples * 2])
peak = max(abs(s) for s in samples)
rms = int(math.sqrt(sum(s * s for s in samples) / n_samples))
output_queue.put_nowait({
'type': 'scope',
'rms': rms,
'peak': peak,
'waveform': _encode_scope_waveform(samples),
})
except (struct.error, ValueError, queue.Full):
pass
peak = max(abs(s) for s in samples)
rms = int(math.sqrt(sum(s * s for s in samples) / n_samples))
output_queue.put_nowait({
'type': 'scope',
'rms': rms,
'peak': peak,
'waveform': _encode_scope_waveform(samples),
})
except (struct.error, ValueError, queue.Full):
pass
except Exception as e:
logger.debug(f"Audio relay error: {e}")
finally:
@@ -220,7 +221,7 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
except Exception as e:
app_module.output_queue.put({'type': 'error', 'text': str(e)})
finally:
global pager_active_device
global pager_active_device, pager_active_sdr_type
try:
os.close(master_fd)
except OSError:
@@ -249,13 +250,14 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
app_module.current_process = None
# Release SDR device
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
pager_active_device = None
pager_active_sdr_type = None
@pager_bp.route('/start', methods=['POST'])
def start_decoding() -> Response:
global pager_active_device
global pager_active_device, pager_active_sdr_type
with app_module.process_lock:
if app_module.current_process:
@@ -284,10 +286,13 @@ def start_decoding() -> Response:
rtl_tcp_host = data.get('rtl_tcp_host')
rtl_tcp_port = data.get('rtl_tcp_port', 1234)
# Get SDR type early so we can pass it to claim/release
sdr_type_str = data.get('sdr_type', 'rtlsdr')
# Claim local device if not using remote rtl_tcp
if not rtl_tcp_host:
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'pager')
error = app_module.claim_sdr_device(device_int, 'pager', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -295,14 +300,16 @@ def start_decoding() -> Response:
'message': error
}), 409
pager_active_device = device_int
pager_active_sdr_type = sdr_type_str
# Validate protocols
valid_protocols = ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX']
protocols = data.get('protocols', valid_protocols)
if not isinstance(protocols, list):
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
pager_active_device = None
pager_active_sdr_type = None
return jsonify({'status': 'error', 'message': 'Protocols must be a list'}), 400
protocols = [p for p in protocols if p in valid_protocols]
if not protocols:
@@ -327,8 +334,7 @@ def start_decoding() -> Response:
elif proto == 'FLEX':
decoders.extend(['-a', 'FLEX'])
# Get SDR type and build command via abstraction layer
sdr_type_str = data.get('sdr_type', 'rtlsdr')
# Build command via SDR abstraction layer
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
@@ -443,8 +449,9 @@ def start_decoding() -> Response:
pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
pager_active_device = None
pager_active_sdr_type = None
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
# Kill orphaned rtl_fm process if it was started
@@ -458,14 +465,15 @@ def start_decoding() -> Response:
pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
pager_active_device = None
pager_active_sdr_type = None
return jsonify({'status': 'error', 'message': str(e)})
@pager_bp.route('/stop', methods=['POST'])
def stop_decoding() -> Response:
global pager_active_device
global pager_active_device, pager_active_sdr_type
with app_module.process_lock:
if app_module.current_process:
@@ -502,8 +510,9 @@ def stop_decoding() -> Response:
# Release device from registry
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
pager_active_device = None
pager_active_sdr_type = None
return jsonify({'status': 'stopped'})
@@ -553,22 +562,22 @@ def toggle_logging() -> Response:
return jsonify({'logging': app_module.logging_enabled, 'log_file': app_module.log_file_path})
@pager_bp.route('/stream')
def stream() -> Response:
def _on_msg(msg: dict[str, Any]) -> None:
process_event('pager', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=app_module.output_queue,
channel_key='pager',
timeout=1.0,
keepalive_interval=30.0,
on_message=_on_msg,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
@pager_bp.route('/stream')
def stream() -> Response:
def _on_msg(msg: dict[str, Any]) -> None:
process_event('pager', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=app_module.output_queue,
channel_key='pager',
timeout=1.0,
keepalive_interval=30.0,
on_message=_on_msg,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response

View File

@@ -1,5 +1,5 @@
"""RTL_433 sensor monitoring routes."""
"""RTL_433 sensor monitoring routes."""
from __future__ import annotations
import json
@@ -10,25 +10,26 @@ import threading
import time
from datetime import datetime
from typing import Any, Generator
from flask import Blueprint, jsonify, request, Response
import app as app_module
from utils.logging import sensor_logger as logger
from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from flask import Blueprint, jsonify, request, Response
import app as app_module
from utils.logging import sensor_logger as logger
from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType
sensor_bp = Blueprint('sensor', __name__)
# Track which device is being used
sensor_active_device: int | None = None
from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType
sensor_bp = Blueprint('sensor', __name__)
# Track which device is being used
sensor_active_device: int | None = None
sensor_active_sdr_type: str | None = None
# RSSI history per device (model_id -> list of (timestamp, rssi))
sensor_rssi_history: dict[str, list[tuple[float, float]]] = {}
_MAX_RSSI_HISTORY = 60
@@ -65,36 +66,36 @@ def _build_scope_waveform(rssi: float, snr: float, noise: float, points: int = 2
def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
"""Stream rtl_433 JSON output to queue."""
try:
app_module.sensor_queue.put({'type': 'status', 'text': 'started'})
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
try:
# rtl_433 outputs JSON objects, one per line
data = json.loads(line)
data['type'] = 'sensor'
app_module.sensor_queue.put(data)
# Track RSSI history per device
_model = data.get('model', '')
_dev_id = data.get('id', '')
_rssi_val = data.get('rssi')
if _rssi_val is not None and _model:
_hist_key = f"{_model}_{_dev_id}"
hist = sensor_rssi_history.setdefault(_hist_key, [])
hist.append((time.time(), float(_rssi_val)))
if len(hist) > _MAX_RSSI_HISTORY:
del hist[: len(hist) - _MAX_RSSI_HISTORY]
# Push scope event when signal level data is present
rssi = data.get('rssi')
snr = data.get('snr')
noise = data.get('noise')
"""Stream rtl_433 JSON output to queue."""
try:
app_module.sensor_queue.put({'type': 'status', 'text': 'started'})
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
try:
# rtl_433 outputs JSON objects, one per line
data = json.loads(line)
data['type'] = 'sensor'
app_module.sensor_queue.put(data)
# Track RSSI history per device
_model = data.get('model', '')
_dev_id = data.get('id', '')
_rssi_val = data.get('rssi')
if _rssi_val is not None and _model:
_hist_key = f"{_model}_{_dev_id}"
hist = sensor_rssi_history.setdefault(_hist_key, [])
hist.append((time.time(), float(_rssi_val)))
if len(hist) > _MAX_RSSI_HISTORY:
del hist[: len(hist) - _MAX_RSSI_HISTORY]
# Push scope event when signal level data is present
rssi = data.get('rssi')
snr = data.get('snr')
noise = data.get('noise')
if rssi is not None or snr is not None:
try:
rssi_value = float(rssi) if rssi is not None else 0.0
@@ -113,204 +114,211 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
})
except (TypeError, ValueError, queue.Full):
pass
# Log if enabled
if app_module.logging_enabled:
try:
with open(app_module.log_file_path, 'a') as f:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {data.get('model', 'Unknown')} | {json.dumps(data)}\n")
except Exception:
pass
except json.JSONDecodeError:
# Not JSON, send as raw
app_module.sensor_queue.put({'type': 'raw', 'text': line})
except Exception as e:
app_module.sensor_queue.put({'type': 'error', 'text': str(e)})
finally:
global sensor_active_device
# Ensure process is terminated
try:
process.terminate()
process.wait(timeout=2)
except Exception:
try:
process.kill()
except Exception:
pass
unregister_process(process)
app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.sensor_lock:
app_module.sensor_process = None
# Release SDR device
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
@sensor_bp.route('/sensor/status')
def sensor_status() -> Response:
"""Check if sensor decoder is currently running."""
with app_module.sensor_lock:
running = app_module.sensor_process is not None and app_module.sensor_process.poll() is None
return jsonify({'running': running})
@sensor_bp.route('/start_sensor', methods=['POST'])
def start_sensor() -> Response:
global sensor_active_device
with app_module.sensor_lock:
if app_module.sensor_process:
return jsonify({'status': 'error', 'message': 'Sensor already running'}), 409
data = request.json or {}
# Validate inputs
try:
freq = validate_frequency(data.get('frequency', '433.92'))
gain = validate_gain(data.get('gain', '0'))
ppm = validate_ppm(data.get('ppm', '0'))
device = validate_device_index(data.get('device', '0'))
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
# Check for rtl_tcp (remote SDR) connection
rtl_tcp_host = data.get('rtl_tcp_host')
rtl_tcp_port = data.get('rtl_tcp_port', 1234)
# Claim local device if not using remote rtl_tcp
if not rtl_tcp_host:
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'sensor')
if error:
return jsonify({
'status': 'error',
'error_type': 'DEVICE_BUSY',
'message': error
}), 409
sensor_active_device = device_int
# Clear queue
while not app_module.sensor_queue.empty():
try:
app_module.sensor_queue.get_nowait()
except queue.Empty:
break
# Get SDR type and build command via abstraction layer
sdr_type_str = data.get('sdr_type', 'rtlsdr')
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
sdr_type = SDRType.RTL_SDR
if rtl_tcp_host:
# Validate and create network device
try:
rtl_tcp_host = validate_rtl_tcp_host(rtl_tcp_host)
rtl_tcp_port = validate_rtl_tcp_port(rtl_tcp_port)
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
sdr_device = SDRFactory.create_network_device(rtl_tcp_host, rtl_tcp_port)
logger.info(f"Using remote SDR: rtl_tcp://{rtl_tcp_host}:{rtl_tcp_port}")
else:
# Create local device object
sdr_device = SDRFactory.create_default_device(sdr_type, index=device)
builder = SDRFactory.get_builder(sdr_device.sdr_type)
# Build ISM band decoder command
bias_t = data.get('bias_t', False)
cmd = builder.build_ism_command(
device=sdr_device,
frequency_mhz=freq,
gain=float(gain) if gain and gain != 0 else None,
ppm=int(ppm) if ppm and ppm != 0 else None,
bias_t=bias_t
)
full_cmd = ' '.join(cmd)
logger.info(f"Running: {full_cmd}")
# Add signal level metadata so the frontend scope can display RSSI/SNR
# Disable stats reporting to suppress "row count limit 50 reached" warnings
cmd.extend(['-M', 'level', '-M', 'stats:0'])
try:
app_module.sensor_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(app_module.sensor_process)
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,))
thread.daemon = True
thread.start()
# Monitor stderr
# Filter noisy rtl_433 diagnostics that aren't useful to display
_stderr_noise = (
'bitbuffer_add_bit',
'row count limit',
)
def monitor_stderr():
for line in app_module.sensor_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err and not any(noise in err for noise in _stderr_noise):
logger.debug(f"[rtl_433] {err}")
app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
stderr_thread = threading.Thread(target=monitor_stderr)
stderr_thread.daemon = True
stderr_thread.start()
app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError:
# Release device on failure
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'})
except Exception as e:
# Release device on failure
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'error', 'message': str(e)})
@sensor_bp.route('/stop_sensor', methods=['POST'])
def stop_sensor() -> Response:
global sensor_active_device
with app_module.sensor_lock:
if app_module.sensor_process:
app_module.sensor_process.terminate()
try:
app_module.sensor_process.wait(timeout=2)
except subprocess.TimeoutExpired:
app_module.sensor_process.kill()
app_module.sensor_process = None
# Release device from registry
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
# Log if enabled
if app_module.logging_enabled:
try:
with open(app_module.log_file_path, 'a') as f:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {data.get('model', 'Unknown')} | {json.dumps(data)}\n")
except Exception:
pass
except json.JSONDecodeError:
# Not JSON, send as raw
app_module.sensor_queue.put({'type': 'raw', 'text': line})
except Exception as e:
app_module.sensor_queue.put({'type': 'error', 'text': str(e)})
finally:
global sensor_active_device, sensor_active_sdr_type
# Ensure process is terminated
try:
process.terminate()
process.wait(timeout=2)
except Exception:
try:
process.kill()
except Exception:
pass
unregister_process(process)
app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.sensor_lock:
app_module.sensor_process = None
# Release SDR device
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr')
sensor_active_device = None
sensor_active_sdr_type = None
@sensor_bp.route('/sensor/status')
def sensor_status() -> Response:
"""Check if sensor decoder is currently running."""
with app_module.sensor_lock:
running = app_module.sensor_process is not None and app_module.sensor_process.poll() is None
return jsonify({'running': running})
@sensor_bp.route('/start_sensor', methods=['POST'])
def start_sensor() -> Response:
global sensor_active_device, sensor_active_sdr_type
with app_module.sensor_lock:
if app_module.sensor_process:
return jsonify({'status': 'error', 'message': 'Sensor already running'}), 409
data = request.json or {}
# Validate inputs
try:
freq = validate_frequency(data.get('frequency', '433.92'))
gain = validate_gain(data.get('gain', '0'))
ppm = validate_ppm(data.get('ppm', '0'))
device = validate_device_index(data.get('device', '0'))
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
# Check for rtl_tcp (remote SDR) connection
rtl_tcp_host = data.get('rtl_tcp_host')
rtl_tcp_port = data.get('rtl_tcp_port', 1234)
# Get SDR type early so we can pass it to claim/release
sdr_type_str = data.get('sdr_type', 'rtlsdr')
# Claim local device if not using remote rtl_tcp
if not rtl_tcp_host:
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'sensor', sdr_type_str)
if error:
return jsonify({
'status': 'error',
'error_type': 'DEVICE_BUSY',
'message': error
}), 409
sensor_active_device = device_int
sensor_active_sdr_type = sdr_type_str
# Clear queue
while not app_module.sensor_queue.empty():
try:
app_module.sensor_queue.get_nowait()
except queue.Empty:
break
# Build command via SDR abstraction layer
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
sdr_type = SDRType.RTL_SDR
if rtl_tcp_host:
# Validate and create network device
try:
rtl_tcp_host = validate_rtl_tcp_host(rtl_tcp_host)
rtl_tcp_port = validate_rtl_tcp_port(rtl_tcp_port)
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
sdr_device = SDRFactory.create_network_device(rtl_tcp_host, rtl_tcp_port)
logger.info(f"Using remote SDR: rtl_tcp://{rtl_tcp_host}:{rtl_tcp_port}")
else:
# Create local device object
sdr_device = SDRFactory.create_default_device(sdr_type, index=device)
builder = SDRFactory.get_builder(sdr_device.sdr_type)
# Build ISM band decoder command
bias_t = data.get('bias_t', False)
cmd = builder.build_ism_command(
device=sdr_device,
frequency_mhz=freq,
gain=float(gain) if gain and gain != 0 else None,
ppm=int(ppm) if ppm and ppm != 0 else None,
bias_t=bias_t
)
full_cmd = ' '.join(cmd)
logger.info(f"Running: {full_cmd}")
# Add signal level metadata so the frontend scope can display RSSI/SNR
# Disable stats reporting to suppress "row count limit 50 reached" warnings
cmd.extend(['-M', 'level', '-M', 'stats:0'])
try:
app_module.sensor_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(app_module.sensor_process)
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,))
thread.daemon = True
thread.start()
# Monitor stderr
# Filter noisy rtl_433 diagnostics that aren't useful to display
_stderr_noise = (
'bitbuffer_add_bit',
'row count limit',
)
def monitor_stderr():
for line in app_module.sensor_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err and not any(noise in err for noise in _stderr_noise):
logger.debug(f"[rtl_433] {err}")
app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
stderr_thread = threading.Thread(target=monitor_stderr)
stderr_thread.daemon = True
stderr_thread.start()
app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError:
# Release device on failure
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr')
sensor_active_device = None
sensor_active_sdr_type = None
return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'})
except Exception as e:
# Release device on failure
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr')
sensor_active_device = None
sensor_active_sdr_type = None
return jsonify({'status': 'error', 'message': str(e)})
@sensor_bp.route('/stop_sensor', methods=['POST'])
def stop_sensor() -> Response:
global sensor_active_device, sensor_active_sdr_type
with app_module.sensor_lock:
if app_module.sensor_process:
app_module.sensor_process.terminate()
try:
app_module.sensor_process.wait(timeout=2)
except subprocess.TimeoutExpired:
app_module.sensor_process.kill()
app_module.sensor_process = None
# Release device from registry
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr')
sensor_active_device = None
sensor_active_sdr_type = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@sensor_bp.route('/stream_sensor')
def stream_sensor() -> Response:
def _on_msg(msg: dict[str, Any]) -> None:
@@ -330,12 +338,12 @@ def stream_sensor() -> Response:
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
@sensor_bp.route('/sensor/rssi_history')
def get_rssi_history() -> Response:
"""Return RSSI history for all tracked sensor devices."""
result = {}
for key, entries in sensor_rssi_history.items():
result[key] = [{'t': round(t, 1), 'rssi': rssi} for t, rssi in entries]
return jsonify({'status': 'success', 'devices': result})
@sensor_bp.route('/sensor/rssi_history')
def get_rssi_history() -> Response:
"""Return RSSI history for all tracked sensor devices."""
result = {}
for key, entries in sensor_rssi_history.items():
result[key] = [{'t': round(t, 1), 'rssi': rssi} for t, rssi in entries]
return jsonify({'status': 'success', 'devices': result})

View File

@@ -1,17 +1,17 @@
"""VDL2 aircraft datalink routes."""
from __future__ import annotations
import io
import json
import os
import platform
import pty
import queue
import shutil
import subprocess
import threading
import time
from __future__ import annotations
import io
import json
import os
import platform
import pty
import queue
import shutil
import subprocess
import threading
import time
from datetime import datetime
from typing import Generator
@@ -21,7 +21,7 @@ import app as app_module
from utils.logging import sensor_logger as logger
from utils.validation import validate_device_index, validate_gain, validate_ppm
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
from utils.constants import (
PROCESS_TERMINATE_TIMEOUT,
@@ -48,6 +48,7 @@ vdl2_last_message_time = None
# Track which device is being used
vdl2_active_device: int | None = None
vdl2_active_sdr_type: str | None = None
def find_dumpvdl2():
@@ -55,22 +56,22 @@ def find_dumpvdl2():
return shutil.which('dumpvdl2')
def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> None:
"""Stream dumpvdl2 JSON output to queue."""
global vdl2_message_count, vdl2_last_message_time
try:
app_module.vdl2_queue.put({'type': 'status', 'status': 'started'})
# Use appropriate sentinel based on mode (text mode for pty on macOS)
sentinel = '' if is_text_mode else b''
for line in iter(process.stdout.readline, sentinel):
if is_text_mode:
line = line.strip()
else:
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> None:
"""Stream dumpvdl2 JSON output to queue."""
global vdl2_message_count, vdl2_last_message_time
try:
app_module.vdl2_queue.put({'type': 'status', 'status': 'started'})
# Use appropriate sentinel based on mode (text mode for pty on macOS)
sentinel = '' if is_text_mode else b''
for line in iter(process.stdout.readline, sentinel):
if is_text_mode:
line = line.strip()
else:
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
try:
data = json.loads(line)
@@ -110,7 +111,7 @@ def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) ->
logger.error(f"VDL2 stream error: {e}")
app_module.vdl2_queue.put({'type': 'error', 'message': str(e)})
finally:
global vdl2_active_device
global vdl2_active_device, vdl2_active_sdr_type
# Ensure process is terminated
try:
process.terminate()
@@ -126,8 +127,9 @@ def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) ->
app_module.vdl2_process = None
# Release SDR device
if vdl2_active_device is not None:
app_module.release_sdr_device(vdl2_active_device)
app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr')
vdl2_active_device = None
vdl2_active_sdr_type = None
@vdl2_bp.route('/tools')
@@ -159,7 +161,7 @@ def vdl2_status() -> Response:
@vdl2_bp.route('/start', methods=['POST'])
def start_vdl2() -> Response:
"""Start VDL2 decoder."""
global vdl2_message_count, vdl2_last_message_time, vdl2_active_device
global vdl2_message_count, vdl2_last_message_time, vdl2_active_device, vdl2_active_sdr_type
with app_module.vdl2_lock:
if app_module.vdl2_process and app_module.vdl2_process.poll() is None:
@@ -186,9 +188,16 @@ def start_vdl2() -> Response:
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
# Resolve SDR type for device selection
sdr_type_str = data.get('sdr_type', 'rtlsdr')
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
sdr_type = SDRType.RTL_SDR
# Check if device is available
device_int = int(device)
error = app_module.claim_sdr_device(device_int, 'vdl2')
error = app_module.claim_sdr_device(device_int, 'vdl2', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -197,6 +206,7 @@ def start_vdl2() -> Response:
}), 409
vdl2_active_device = device_int
vdl2_active_sdr_type = sdr_type_str
# Get frequencies - use provided or defaults
# dumpvdl2 expects frequencies in Hz (integers)
@@ -215,13 +225,6 @@ def start_vdl2() -> Response:
vdl2_message_count = 0
vdl2_last_message_time = None
# Resolve SDR type for device selection
sdr_type_str = data.get('sdr_type', 'rtlsdr')
try:
sdr_type = SDRType(sdr_type_str)
except ValueError:
sdr_type = SDRType.RTL_SDR
is_soapy = sdr_type not in (SDRType.RTL_SDR,)
# Build dumpvdl2 command
@@ -252,28 +255,28 @@ def start_vdl2() -> Response:
logger.info(f"Starting VDL2 decoder: {' '.join(cmd)}")
try:
is_text_mode = False
# On macOS, use pty to avoid stdout buffering issues
if platform.system() == 'Darwin':
master_fd, slave_fd = pty.openpty()
process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=subprocess.PIPE,
start_new_session=True
)
os.close(slave_fd)
# Wrap master_fd as a text file for line-buffered reading
process.stdout = io.open(master_fd, 'r', buffering=1)
is_text_mode = True
else:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
is_text_mode = False
# On macOS, use pty to avoid stdout buffering issues
if platform.system() == 'Darwin':
master_fd, slave_fd = pty.openpty()
process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=subprocess.PIPE,
start_new_session=True
)
os.close(slave_fd)
# Wrap master_fd as a text file for line-buffered reading
process.stdout = io.open(master_fd, 'r', buffering=1)
is_text_mode = True
else:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
# Wait briefly to check if process started
time.sleep(PROCESS_START_WAIT)
@@ -281,8 +284,9 @@ def start_vdl2() -> Response:
if process.poll() is not None:
# Process died - release device
if vdl2_active_device is not None:
app_module.release_sdr_device(vdl2_active_device)
app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr')
vdl2_active_device = None
vdl2_active_sdr_type = None
stderr = ''
if process.stderr:
stderr = process.stderr.read().decode('utf-8', errors='replace')
@@ -295,12 +299,12 @@ def start_vdl2() -> Response:
app_module.vdl2_process = process
register_process(process)
# Start output streaming thread
thread = threading.Thread(
target=stream_vdl2_output,
args=(process, is_text_mode),
daemon=True
)
# Start output streaming thread
thread = threading.Thread(
target=stream_vdl2_output,
args=(process, is_text_mode),
daemon=True
)
thread.start()
return jsonify({
@@ -313,8 +317,9 @@ def start_vdl2() -> Response:
except Exception as e:
# Release device on failure
if vdl2_active_device is not None:
app_module.release_sdr_device(vdl2_active_device)
app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr')
vdl2_active_device = None
vdl2_active_sdr_type = None
logger.error(f"Failed to start VDL2 decoder: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@@ -322,7 +327,7 @@ def start_vdl2() -> Response:
@vdl2_bp.route('/stop', methods=['POST'])
def stop_vdl2() -> Response:
"""Stop VDL2 decoder."""
global vdl2_active_device
global vdl2_active_device, vdl2_active_sdr_type
with app_module.vdl2_lock:
if not app_module.vdl2_process:
@@ -343,31 +348,32 @@ def stop_vdl2() -> Response:
# Release device from registry
if vdl2_active_device is not None:
app_module.release_sdr_device(vdl2_active_device)
app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr')
vdl2_active_device = None
vdl2_active_sdr_type = None
return jsonify({'status': 'stopped'})
@vdl2_bp.route('/stream')
def stream_vdl2() -> Response:
"""SSE stream for VDL2 messages."""
def _on_msg(msg: dict[str, Any]) -> None:
process_event('vdl2', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=app_module.vdl2_queue,
channel_key='vdl2',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
on_message=_on_msg,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
@vdl2_bp.route('/stream')
def stream_vdl2() -> Response:
"""SSE stream for VDL2 messages."""
def _on_msg(msg: dict[str, Any]) -> None:
process_event('vdl2', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=app_module.vdl2_queue,
channel_key='vdl2',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
on_message=_on_msg,
),
mimetype='text/event-stream',
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
@vdl2_bp.route('/frequencies')

View File

@@ -367,6 +367,7 @@ def init_waterfall_websocket(app: Flask):
reader_thread = None
stop_event = threading.Event()
claimed_device = None
claimed_sdr_type = 'rtlsdr'
my_generation = None # tracks which capture generation this handler owns
capture_center_mhz = 0.0
capture_start_freq = 0.0
@@ -430,8 +431,9 @@ def init_waterfall_websocket(app: Flask):
unregister_process(iq_process)
iq_process = None
if claimed_device is not None:
app_module.release_sdr_device(claimed_device)
app_module.release_sdr_device(claimed_device, claimed_sdr_type)
claimed_device = None
claimed_sdr_type = 'rtlsdr'
_set_shared_capture_state(running=False, generation=my_generation)
my_generation = None
stop_event.clear()
@@ -513,7 +515,7 @@ def init_waterfall_websocket(app: Flask):
max_claim_attempts = 4 if was_restarting else 1
claim_err = None
for _claim_attempt in range(max_claim_attempts):
claim_err = app_module.claim_sdr_device(device_index, 'waterfall')
claim_err = app_module.claim_sdr_device(device_index, 'waterfall', sdr_type_str)
if not claim_err:
break
if _claim_attempt < max_claim_attempts - 1:
@@ -526,6 +528,7 @@ def init_waterfall_websocket(app: Flask):
}))
continue
claimed_device = device_index
claimed_sdr_type = sdr_type_str
# Build I/Q capture command
try:
@@ -539,8 +542,9 @@ def init_waterfall_websocket(app: Flask):
bias_t=bias_t,
)
except NotImplementedError as e:
app_module.release_sdr_device(device_index)
app_module.release_sdr_device(device_index, sdr_type_str)
claimed_device = None
claimed_sdr_type = 'rtlsdr'
ws.send(json.dumps({
'status': 'error',
'message': str(e),
@@ -549,8 +553,9 @@ def init_waterfall_websocket(app: Flask):
# Pre-flight: check the capture binary exists
if not shutil.which(iq_cmd[0]):
app_module.release_sdr_device(device_index)
app_module.release_sdr_device(device_index, sdr_type_str)
claimed_device = None
claimed_sdr_type = 'rtlsdr'
ws.send(json.dumps({
'status': 'error',
'message': f'Required tool "{iq_cmd[0]}" not found. Install SoapySDR tools (rx_sdr).',
@@ -602,8 +607,9 @@ def init_waterfall_websocket(app: Flask):
safe_terminate(iq_process)
unregister_process(iq_process)
iq_process = None
app_module.release_sdr_device(device_index)
app_module.release_sdr_device(device_index, sdr_type_str)
claimed_device = None
claimed_sdr_type = 'rtlsdr'
ws.send(json.dumps({
'status': 'error',
'message': f'Failed to start I/Q capture: {e}',
@@ -806,8 +812,9 @@ def init_waterfall_websocket(app: Flask):
unregister_process(iq_process)
iq_process = None
if claimed_device is not None:
app_module.release_sdr_device(claimed_device)
app_module.release_sdr_device(claimed_device, claimed_sdr_type)
claimed_device = None
claimed_sdr_type = 'rtlsdr'
_set_shared_capture_state(running=False, generation=my_generation)
my_generation = None
stop_event.clear()
@@ -825,7 +832,7 @@ def init_waterfall_websocket(app: Flask):
safe_terminate(iq_process)
unregister_process(iq_process)
if claimed_device is not None:
app_module.release_sdr_device(claimed_device)
app_module.release_sdr_device(claimed_device, claimed_sdr_type)
_set_shared_capture_state(running=False, generation=my_generation)
# Complete WebSocket close handshake, then shut down the
# raw socket so Werkzeug cannot write its HTTP 200 response

View File

@@ -33,11 +33,12 @@ _wefax_queue: queue.Queue = queue.Queue(maxsize=100)
# Track active SDR device
wefax_active_device: int | None = None
wefax_active_sdr_type: str | None = None
def _progress_callback(data: dict) -> None:
"""Callback to queue progress updates for SSE stream."""
global wefax_active_device
global wefax_active_device, wefax_active_sdr_type
try:
_wefax_queue.put_nowait(data)
@@ -56,8 +57,9 @@ def _progress_callback(data: dict) -> None:
and data.get('status') in ('complete', 'error', 'stopped')
and wefax_active_device is not None
):
app_module.release_sdr_device(wefax_active_device)
app_module.release_sdr_device(wefax_active_device, wefax_active_sdr_type or 'rtlsdr')
wefax_active_device = None
wefax_active_sdr_type = None
@wefax_bp.route('/status')
@@ -169,9 +171,9 @@ def start_decoder():
}), 400
# Claim SDR device
global wefax_active_device
global wefax_active_device, wefax_active_sdr_type
device_int = int(device_index)
error = app_module.claim_sdr_device(device_int, 'wefax')
error = app_module.claim_sdr_device(device_int, 'wefax', sdr_type_str)
if error:
return jsonify({
'status': 'error',
@@ -194,6 +196,7 @@ def start_decoder():
if success:
wefax_active_device = device_int
wefax_active_sdr_type = sdr_type_str
return jsonify({
'status': 'started',
'frequency_khz': frequency_khz,
@@ -209,7 +212,7 @@ def start_decoder():
'device': device_int,
})
else:
app_module.release_sdr_device(device_int)
app_module.release_sdr_device(device_int, sdr_type_str)
return jsonify({
'status': 'error',
'message': 'Failed to start decoder',
@@ -219,13 +222,14 @@ def start_decoder():
@wefax_bp.route('/stop', methods=['POST'])
def stop_decoder():
"""Stop WeFax decoder."""
global wefax_active_device
global wefax_active_device, wefax_active_sdr_type
decoder = get_wefax_decoder()
decoder.stop()
if wefax_active_device is not None:
app_module.release_sdr_device(wefax_active_device)
app_module.release_sdr_device(wefax_active_device, wefax_active_sdr_type or 'rtlsdr')
wefax_active_device = None
wefax_active_sdr_type = None
return jsonify({'status': 'stopped'})

View File

@@ -1752,31 +1752,37 @@ ACARS: ${r.statistics.acarsMessages} messages`;
airbandSelect.innerHTML = '';
if (devices.length === 0) {
adsbSelect.innerHTML = '<option value="0">No SDR found</option>';
airbandSelect.innerHTML = '<option value="0">No SDR found</option>';
adsbSelect.innerHTML = '<option value="rtlsdr:0">No SDR found</option>';
airbandSelect.innerHTML = '<option value="rtlsdr:0">No SDR found</option>';
airbandSelect.disabled = true;
} else {
devices.forEach((dev, i) => {
const idx = dev.index !== undefined ? dev.index : i;
const sdrType = dev.sdr_type || 'rtlsdr';
const compositeVal = `${sdrType}:${idx}`;
const displayName = `SDR ${idx}: ${dev.name}`;
// Add to ADS-B selector
const adsbOpt = document.createElement('option');
adsbOpt.value = idx;
adsbOpt.value = compositeVal;
adsbOpt.dataset.sdrType = sdrType;
adsbOpt.dataset.index = idx;
adsbOpt.textContent = displayName;
adsbSelect.appendChild(adsbOpt);
// Add to Airband selector
const airbandOpt = document.createElement('option');
airbandOpt.value = idx;
airbandOpt.value = compositeVal;
airbandOpt.dataset.sdrType = sdrType;
airbandOpt.dataset.index = idx;
airbandOpt.textContent = displayName;
airbandSelect.appendChild(airbandOpt);
});
// Default: ADS-B uses first device, Airband uses second (if available)
adsbSelect.value = devices[0].index !== undefined ? devices[0].index : 0;
adsbSelect.value = adsbSelect.options[0]?.value || 'rtlsdr:0';
if (devices.length > 1) {
airbandSelect.value = devices[1].index !== undefined ? devices[1].index : 1;
airbandSelect.value = airbandSelect.options[1]?.value || airbandSelect.options[0]?.value || 'rtlsdr:0';
}
// Show warning if only one device
@@ -1787,8 +1793,8 @@ ACARS: ${r.statistics.acarsMessages} messages`;
}
})
.catch(() => {
document.getElementById('adsbDeviceSelect').innerHTML = '<option value="0">Error</option>';
document.getElementById('airbandDeviceSelect').innerHTML = '<option value="0">Error</option>';
document.getElementById('adsbDeviceSelect').innerHTML = '<option value="rtlsdr:0">Error</option>';
document.getElementById('airbandDeviceSelect').innerHTML = '<option value="rtlsdr:0">Error</option>';
});
}
@@ -2151,11 +2157,14 @@ sudo make install</code>
}
}
// Get selected ADS-B device
const adsbDevice = parseInt(document.getElementById('adsbDeviceSelect').value) || 0;
// Get selected ADS-B device (composite value "sdr_type:index")
const adsbSelectVal = document.getElementById('adsbDeviceSelect').value || 'rtlsdr:0';
const [adsbSdrType, adsbDeviceIdx] = adsbSelectVal.includes(':') ? adsbSelectVal.split(':') : ['rtlsdr', adsbSelectVal];
const adsbDevice = parseInt(adsbDeviceIdx) || 0;
const requestBody = {
device: adsbDevice,
sdr_type: adsbSdrType,
bias_t: getBiasTEnabled()
};
if (remoteConfig) {
@@ -2306,11 +2315,13 @@ sudo make install</code>
}
const sessionDevice = session.device_index;
const sessionSdrType = session.sdr_type || 'rtlsdr';
if (sessionDevice !== null && sessionDevice !== undefined) {
adsbActiveDevice = sessionDevice;
const adsbSelect = document.getElementById('adsbDeviceSelect');
if (adsbSelect) {
adsbSelect.value = sessionDevice;
// Use composite value to select the correct device+type
adsbSelect.value = `${sessionSdrType}:${sessionDevice}`;
}
}
@@ -3663,8 +3674,9 @@ sudo make install</code>
function startAcars() {
const acarsSelect = document.getElementById('acarsDeviceSelect');
const device = acarsSelect.value;
const sdr_type = acarsSelect.selectedOptions[0]?.dataset.sdrType || 'rtlsdr';
const compositeVal = acarsSelect.value || 'rtlsdr:0';
const [sdr_type, deviceIdx] = compositeVal.includes(':') ? compositeVal.split(':') : ['rtlsdr', compositeVal];
const device = deviceIdx;
const frequencies = getAcarsRegionFreqs();
// Check if using agent mode
@@ -3896,18 +3908,21 @@ sudo make install</code>
const select = document.getElementById('acarsDeviceSelect');
select.innerHTML = '';
if (devices.length === 0) {
select.innerHTML = '<option value="0">No SDR detected</option>';
select.innerHTML = '<option value="rtlsdr:0">No SDR detected</option>';
} else {
devices.forEach((d, i) => {
const opt = document.createElement('option');
opt.value = d.index || i;
opt.dataset.sdrType = d.sdr_type || 'rtlsdr';
opt.textContent = `SDR ${d.index || i}: ${d.name || d.type || 'SDR'}`;
const sdrType = d.sdr_type || 'rtlsdr';
const idx = d.index !== undefined ? d.index : i;
opt.value = `${sdrType}:${idx}`;
opt.dataset.sdrType = sdrType;
opt.dataset.index = idx;
opt.textContent = `SDR ${idx}: ${d.name || d.type || 'SDR'}`;
select.appendChild(opt);
});
// Default to device 1 if available (device 0 likely used for ADS-B)
if (devices.length > 1) {
select.value = '1';
select.value = select.options[1]?.value || select.options[0]?.value;
}
}
});
@@ -3998,8 +4013,9 @@ sudo make install</code>
function startVdl2() {
const vdl2Select = document.getElementById('vdl2DeviceSelect');
const device = vdl2Select.value;
const sdr_type = vdl2Select.selectedOptions[0]?.dataset.sdrType || 'rtlsdr';
const compositeVal = vdl2Select.value || 'rtlsdr:0';
const [sdr_type, deviceIdx] = compositeVal.includes(':') ? compositeVal.split(':') : ['rtlsdr', compositeVal];
const device = deviceIdx;
const frequencies = getVdl2RegionFreqs();
// Check if using agent mode
@@ -4419,17 +4435,20 @@ sudo make install</code>
const select = document.getElementById('vdl2DeviceSelect');
select.innerHTML = '';
if (devices.length === 0) {
select.innerHTML = '<option value="0">No SDR detected</option>';
select.innerHTML = '<option value="rtlsdr:0">No SDR detected</option>';
} else {
devices.forEach((d, i) => {
const opt = document.createElement('option');
opt.value = d.index || i;
opt.dataset.sdrType = d.sdr_type || 'rtlsdr';
opt.textContent = `SDR ${d.index || i}: ${d.name || d.type || 'SDR'}`;
const sdrType = d.sdr_type || 'rtlsdr';
const idx = d.index !== undefined ? d.index : i;
opt.value = `${sdrType}:${idx}`;
opt.dataset.sdrType = sdrType;
opt.dataset.index = idx;
opt.textContent = `SDR ${idx}: ${d.name || d.type || 'SDR'}`;
select.appendChild(opt);
});
if (devices.length > 1) {
select.value = '1';
select.value = select.options[1]?.value || select.options[0]?.value;
}
}
});
@@ -5403,13 +5422,16 @@ sudo make install</code>
select.innerHTML = '';
if (devices.length === 0) {
select.innerHTML = '<option value="0">No SDR found</option>';
select.innerHTML = '<option value="rtlsdr:0">No SDR found</option>';
} else {
devices.forEach(device => {
const opt = document.createElement('option');
opt.value = device.index;
opt.dataset.sdrType = device.sdr_type || 'rtlsdr';
opt.textContent = `SDR ${device.index}: ${device.name || device.type || 'SDR'}`;
const sdrType = device.sdr_type || 'rtlsdr';
const idx = device.index !== undefined ? device.index : 0;
opt.value = `${sdrType}:${idx}`;
opt.dataset.sdrType = sdrType;
opt.dataset.index = idx;
opt.textContent = `SDR ${idx}: ${device.name || device.type || 'SDR'}`;
select.appendChild(opt);
});
}

View File

@@ -5646,37 +5646,41 @@
let currentDeviceList = [];
// SDR Device Usage Tracking
// Tracks which mode is using which device index
// Tracks which mode is using which device (keyed by "sdr_type:index")
const sdrDeviceUsage = {
// deviceIndex: 'modeName' (e.g., 0: 'pager', 1: 'scanner')
// "sdr_type:index": 'modeName' (e.g., "rtlsdr:0": 'pager', "hackrf:0": 'scanner')
};
function getDeviceInUseBy(deviceIndex) {
return sdrDeviceUsage[deviceIndex] || null;
function getDeviceInUseBy(deviceIndex, sdrType) {
const key = `${sdrType || getSelectedSDRType()}:${deviceIndex}`;
return sdrDeviceUsage[key] || null;
}
function isDeviceInUse(deviceIndex) {
return sdrDeviceUsage[deviceIndex] !== undefined;
function isDeviceInUse(deviceIndex, sdrType) {
const key = `${sdrType || getSelectedSDRType()}:${deviceIndex}`;
return sdrDeviceUsage[key] !== undefined;
}
function reserveDevice(deviceIndex, modeName) {
sdrDeviceUsage[deviceIndex] = modeName;
function reserveDevice(deviceIndex, modeName, sdrType) {
const key = `${sdrType || getSelectedSDRType()}:${deviceIndex}`;
sdrDeviceUsage[key] = modeName;
updateDeviceSelectStatus();
}
function releaseDevice(modeName) {
for (const [idx, mode] of Object.entries(sdrDeviceUsage)) {
for (const [key, mode] of Object.entries(sdrDeviceUsage)) {
if (mode === modeName) {
delete sdrDeviceUsage[idx];
delete sdrDeviceUsage[key];
}
}
updateDeviceSelectStatus();
}
function getAvailableDevice() {
// Find first device not in use
// Find first device not in use (within selected SDR type)
const sdrType = getSelectedSDRType();
for (const device of currentDeviceList) {
if (!isDeviceInUse(device.index)) {
if ((device.sdr_type || 'rtlsdr') === sdrType && !isDeviceInUse(device.index, sdrType)) {
return device.index;
}
}
@@ -5688,10 +5692,11 @@
const select = document.getElementById('deviceSelect');
if (!select) return;
const sdrType = getSelectedSDRType();
const options = select.querySelectorAll('option');
options.forEach(opt => {
const idx = parseInt(opt.value);
const usedBy = getDeviceInUseBy(idx);
const usedBy = getDeviceInUseBy(idx, sdrType);
const baseName = opt.textContent.replace(/ \[.*\]$/, ''); // Remove existing status
if (usedBy) {
opt.textContent = `${baseName} [${usedBy.toUpperCase()}]`;

View File

@@ -257,8 +257,8 @@ class TestMorseLifecycleRoutes:
released_devices = []
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None)
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx))
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode, sdr_type='rtlsdr': None)
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx, sdr_type='rtlsdr': released_devices.append(idx))
class DummyDevice:
sdr_type = morse_routes.SDRType.RTL_SDR
@@ -337,8 +337,8 @@ class TestMorseLifecycleRoutes:
released_devices = []
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None)
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx))
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode, sdr_type='rtlsdr': None)
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx, sdr_type='rtlsdr': released_devices.append(idx))
class DummyDevice:
sdr_type = morse_routes.SDRType.RTL_SDR
@@ -421,8 +421,8 @@ class TestMorseLifecycleRoutes:
released_devices = []
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None)
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx))
monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode, sdr_type='rtlsdr': None)
monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx, sdr_type='rtlsdr': released_devices.append(idx))
class DummyDevice:
def __init__(self, index: int):

View File

@@ -54,72 +54,72 @@ class TestWeFaxStations:
from utils.wefax_stations import get_station
assert get_station('noj') is not None
def test_get_station_not_found(self):
"""get_station() should return None for unknown callsign."""
from utils.wefax_stations import get_station
assert get_station('XXXXX') is None
def test_resolve_tuning_frequency_auto_uses_carrier_for_known_station(self):
"""Known station frequencies default to carrier-list behavior in auto mode."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='auto',
)
assert math.isclose(tuned, 4296.1, abs_tol=1e-6)
assert reference == 'carrier'
assert offset_applied is True
def test_resolve_tuning_frequency_auto_preserves_unknown_station_input(self):
"""Ad-hoc frequencies (no station metadata) should be treated as dial."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='',
frequency_reference='auto',
)
assert math.isclose(tuned, 4298.0, abs_tol=1e-6)
assert reference == 'dial'
assert offset_applied is False
def test_resolve_tuning_frequency_dial_override(self):
"""Explicit dial reference must bypass USB alignment."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='dial',
)
assert math.isclose(tuned, 4298.0, abs_tol=1e-6)
assert reference == 'dial'
assert offset_applied is False
def test_resolve_tuning_frequency_rejects_invalid_reference(self):
"""Invalid frequency reference values should raise a validation error."""
from utils.wefax_stations import resolve_tuning_frequency_khz
try:
resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='invalid',
)
assert False, "Expected ValueError for invalid frequency_reference"
except ValueError as exc:
assert 'frequency_reference' in str(exc)
def test_station_frequencies_have_khz(self):
"""Each frequency entry must have 'khz' and 'description'."""
from utils.wefax_stations import load_stations
for station in load_stations():
for freq in station['frequencies']:
def test_get_station_not_found(self):
"""get_station() should return None for unknown callsign."""
from utils.wefax_stations import get_station
assert get_station('XXXXX') is None
def test_resolve_tuning_frequency_auto_uses_carrier_for_known_station(self):
"""Known station frequencies default to carrier-list behavior in auto mode."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='auto',
)
assert math.isclose(tuned, 4296.1, abs_tol=1e-6)
assert reference == 'carrier'
assert offset_applied is True
def test_resolve_tuning_frequency_auto_preserves_unknown_station_input(self):
"""Ad-hoc frequencies (no station metadata) should be treated as dial."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='',
frequency_reference='auto',
)
assert math.isclose(tuned, 4298.0, abs_tol=1e-6)
assert reference == 'dial'
assert offset_applied is False
def test_resolve_tuning_frequency_dial_override(self):
"""Explicit dial reference must bypass USB alignment."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='dial',
)
assert math.isclose(tuned, 4298.0, abs_tol=1e-6)
assert reference == 'dial'
assert offset_applied is False
def test_resolve_tuning_frequency_rejects_invalid_reference(self):
"""Invalid frequency reference values should raise a validation error."""
from utils.wefax_stations import resolve_tuning_frequency_khz
try:
resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='invalid',
)
assert False, "Expected ValueError for invalid frequency_reference"
except ValueError as exc:
assert 'frequency_reference' in str(exc)
def test_station_frequencies_have_khz(self):
"""Each frequency entry must have 'khz' and 'description'."""
from utils.wefax_stations import load_stations
for station in load_stations():
for freq in station['frequencies']:
assert 'khz' in freq, f"{station['callsign']} missing khz"
assert 'description' in freq, f"{station['callsign']} missing description"
assert isinstance(freq['khz'], (int, float))
@@ -281,7 +281,7 @@ class TestWeFaxDecoder:
# Route tests
# ---------------------------------------------------------------------------
class TestWeFaxRoutes:
class TestWeFaxRoutes:
"""WeFax route endpoint tests."""
def test_status(self, client):
@@ -390,11 +390,11 @@ class TestWeFaxRoutes:
data = response.get_json()
assert 'LPM' in data['message']
def test_start_success(self, client):
"""POST /wefax/start with valid params should succeed."""
_login_session(client)
mock_decoder = MagicMock()
mock_decoder.is_running = False
def test_start_success(self, client):
"""POST /wefax/start with valid params should succeed."""
_login_session(client)
mock_decoder = MagicMock()
mock_decoder.is_running = False
mock_decoder.start.return_value = True
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \
@@ -411,46 +411,46 @@ class TestWeFaxRoutes:
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
assert data['frequency_khz'] == 4298
assert data['usb_offset_applied'] is True
assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6)
assert data['frequency_reference'] == 'carrier'
assert data['station'] == 'NOJ'
mock_decoder.start.assert_called_once()
start_kwargs = mock_decoder.start.call_args.kwargs
assert math.isclose(start_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6)
def test_start_respects_dial_reference_override(self, client):
"""POST /wefax/start with dial reference should not apply USB offset."""
_login_session(client)
mock_decoder = MagicMock()
mock_decoder.is_running = False
mock_decoder.start.return_value = True
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \
patch('routes.wefax.app_module.claim_sdr_device', return_value=None):
response = client.post(
'/wefax/start',
data=json.dumps({
'frequency_khz': 4298,
'station': 'NOJ',
'device': 0,
'frequency_reference': 'dial',
}),
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
assert data['usb_offset_applied'] is False
assert math.isclose(data['tuned_frequency_khz'], 4298.0, abs_tol=1e-6)
assert data['frequency_reference'] == 'dial'
start_kwargs = mock_decoder.start.call_args.kwargs
assert math.isclose(start_kwargs['frequency_khz'], 4298.0, abs_tol=1e-6)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
assert data['frequency_khz'] == 4298
assert data['usb_offset_applied'] is True
assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6)
assert data['frequency_reference'] == 'carrier'
assert data['station'] == 'NOJ'
mock_decoder.start.assert_called_once()
start_kwargs = mock_decoder.start.call_args.kwargs
assert math.isclose(start_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6)
def test_start_respects_dial_reference_override(self, client):
"""POST /wefax/start with dial reference should not apply USB offset."""
_login_session(client)
mock_decoder = MagicMock()
mock_decoder.is_running = False
mock_decoder.start.return_value = True
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \
patch('routes.wefax.app_module.claim_sdr_device', return_value=None):
response = client.post(
'/wefax/start',
data=json.dumps({
'frequency_khz': 4298,
'station': 'NOJ',
'device': 0,
'frequency_reference': 'dial',
}),
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
assert data['usb_offset_applied'] is False
assert math.isclose(data['tuned_frequency_khz'], 4298.0, abs_tol=1e-6)
assert data['frequency_reference'] == 'dial'
start_kwargs = mock_decoder.start.call_args.kwargs
assert math.isclose(start_kwargs['frequency_khz'], 4298.0, abs_tol=1e-6)
def test_start_device_busy(self, client):
"""POST /wefax/start should return 409 when device is busy."""
@@ -509,83 +509,83 @@ class TestWeFaxRoutes:
assert response.status_code == 400
def test_delete_image_wrong_extension(self, client):
"""DELETE /wefax/images/<filename> should reject non-PNG."""
_login_session(client)
mock_decoder = MagicMock()
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder):
response = client.delete('/wefax/images/test.jpg')
assert response.status_code == 400
def test_schedule_enable_applies_usb_alignment(self, client):
"""Scheduler should receive tuned USB dial frequency in auto mode."""
_login_session(client)
mock_scheduler = MagicMock()
mock_scheduler.enable.return_value = {
'enabled': True,
'scheduled_count': 2,
'total_broadcasts': 2,
}
with patch('utils.wefax_scheduler.get_wefax_scheduler', return_value=mock_scheduler):
response = client.post(
'/wefax/schedule/enable',
data=json.dumps({
'station': 'NOJ',
'frequency_khz': 4298,
'device': 0,
}),
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'ok'
assert data['usb_offset_applied'] is True
assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6)
enable_kwargs = mock_scheduler.enable.call_args.kwargs
assert math.isclose(enable_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6)
class TestWeFaxProgressCallback:
"""Regression tests for WeFax route-level progress callback behavior."""
def test_terminal_progress_releases_active_device(self):
"""Terminal decoder events must release any manually claimed SDR."""
import routes.wefax as wefax_routes
original_device = wefax_routes.wefax_active_device
try:
wefax_routes.wefax_active_device = 3
with patch('routes.wefax.app_module.release_sdr_device') as mock_release:
wefax_routes._progress_callback({
'type': 'wefax_progress',
'status': 'error',
'message': 'decode failed',
})
mock_release.assert_called_once_with(3)
assert wefax_routes.wefax_active_device is None
finally:
wefax_routes.wefax_active_device = original_device
def test_non_terminal_progress_does_not_release_active_device(self):
"""Non-terminal progress updates must not release SDR ownership."""
import routes.wefax as wefax_routes
original_device = wefax_routes.wefax_active_device
try:
wefax_routes.wefax_active_device = 4
with patch('routes.wefax.app_module.release_sdr_device') as mock_release:
wefax_routes._progress_callback({
'type': 'wefax_progress',
'status': 'receiving',
'line_count': 120,
})
mock_release.assert_not_called()
assert wefax_routes.wefax_active_device == 4
finally:
wefax_routes.wefax_active_device = original_device
def test_delete_image_wrong_extension(self, client):
"""DELETE /wefax/images/<filename> should reject non-PNG."""
_login_session(client)
mock_decoder = MagicMock()
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder):
response = client.delete('/wefax/images/test.jpg')
assert response.status_code == 400
def test_schedule_enable_applies_usb_alignment(self, client):
"""Scheduler should receive tuned USB dial frequency in auto mode."""
_login_session(client)
mock_scheduler = MagicMock()
mock_scheduler.enable.return_value = {
'enabled': True,
'scheduled_count': 2,
'total_broadcasts': 2,
}
with patch('utils.wefax_scheduler.get_wefax_scheduler', return_value=mock_scheduler):
response = client.post(
'/wefax/schedule/enable',
data=json.dumps({
'station': 'NOJ',
'frequency_khz': 4298,
'device': 0,
}),
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'ok'
assert data['usb_offset_applied'] is True
assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6)
enable_kwargs = mock_scheduler.enable.call_args.kwargs
assert math.isclose(enable_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6)
class TestWeFaxProgressCallback:
"""Regression tests for WeFax route-level progress callback behavior."""
def test_terminal_progress_releases_active_device(self):
"""Terminal decoder events must release any manually claimed SDR."""
import routes.wefax as wefax_routes
original_device = wefax_routes.wefax_active_device
try:
wefax_routes.wefax_active_device = 3
with patch('routes.wefax.app_module.release_sdr_device') as mock_release:
wefax_routes._progress_callback({
'type': 'wefax_progress',
'status': 'error',
'message': 'decode failed',
})
mock_release.assert_called_once_with(3, 'rtlsdr')
assert wefax_routes.wefax_active_device is None
finally:
wefax_routes.wefax_active_device = original_device
def test_non_terminal_progress_does_not_release_active_device(self):
"""Non-terminal progress updates must not release SDR ownership."""
import routes.wefax as wefax_routes
original_device = wefax_routes.wefax_active_device
try:
wefax_routes.wefax_active_device = 4
with patch('routes.wefax.app_module.release_sdr_device') as mock_release:
wefax_routes._progress_callback({
'type': 'wefax_progress',
'status': 'receiving',
'line_count': 120,
})
mock_release.assert_not_called()
assert wefax_routes.wefax_active_device == 4
finally:
wefax_routes.wefax_active_device = original_device