Files
intercept/tests/test_agent_modes.py
cemaxecuter 3372daca84 Add comprehensive agent mode tests and listening_post SDR check
- Add SDR availability check to listening_post mode startup
- Create tests/test_agent_modes.py with 29 comprehensive tests covering:
  - Mode lifecycle tests (start/stop for all modes)
  - SDR conflict detection (same device vs different device)
  - Process verification (immediate exit detection)
  - Data snapshot operations
  - Error handling (missing tools, invalid modes)
  - Cleanup verification (process termination, thread stopping)
  - Multi-mode simultaneous operation
  - GPS integration
2026-01-26 12:02:52 -05:00

485 lines
19 KiB
Python

"""
Comprehensive tests for Intercept Agent mode operations.
Tests cover:
- All 13 mode start/stop lifecycles
- SDR device conflict detection
- Process verification (subprocess failure handling)
- Data snapshot operations
- Multi-mode scenarios
- Error handling and edge cases
"""
import os
import sys
import json
import time
import pytest
import threading
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime, timezone
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def mode_manager():
"""Create a fresh ModeManager instance for testing."""
from intercept_agent import ModeManager
manager = ModeManager()
yield manager
# Cleanup: stop all modes
for mode in list(manager.running_modes.keys()):
try:
manager.stop_mode(mode)
except Exception:
pass
@pytest.fixture
def mock_subprocess():
"""Mock subprocess.Popen for controlled testing."""
with patch('subprocess.Popen') as mock_popen:
mock_proc = MagicMock()
mock_proc.poll.return_value = None # Process is running
mock_proc.stdout = MagicMock()
mock_proc.stderr = MagicMock()
mock_proc.stderr.read.return_value = b''
mock_proc.stdin = MagicMock()
mock_proc.pid = 12345
mock_proc.wait.return_value = 0
mock_popen.return_value = mock_proc
yield mock_popen, mock_proc
@pytest.fixture
def mock_tools():
"""Mock tool availability checks."""
tools = {
'rtl_433': '/usr/bin/rtl_433',
'rtl_fm': '/usr/bin/rtl_fm',
'dump1090': '/usr/bin/dump1090',
'multimon-ng': '/usr/bin/multimon-ng',
'airodump-ng': '/usr/sbin/airodump-ng',
'acarsdec': '/usr/bin/acarsdec',
'AIS-catcher': '/usr/bin/AIS-catcher',
'direwolf': '/usr/bin/direwolf',
'rtlamr': '/usr/bin/rtlamr',
'rtl_tcp': '/usr/bin/rtl_tcp',
'bluetoothctl': '/usr/bin/bluetoothctl',
}
with patch('shutil.which', side_effect=lambda x: tools.get(x)):
yield tools
# =============================================================================
# SDR Mode List
# =============================================================================
SDR_MODES = ['sensor', 'adsb', 'pager', 'ais', 'acars', 'aprs', 'rtlamr', 'dsc', 'listening_post']
NON_SDR_MODES = ['wifi', 'bluetooth', 'tscm', 'satellite']
ALL_MODES = SDR_MODES + NON_SDR_MODES
# =============================================================================
# Mode Lifecycle Tests
# =============================================================================
class TestModeLifecycle:
"""Test start/stop lifecycle for all modes."""
def test_sensor_mode_lifecycle(self, mode_manager, mock_subprocess, mock_tools):
"""Sensor mode should start and stop cleanly."""
mock_popen, mock_proc = mock_subprocess
# Start
result = mode_manager.start_mode('sensor', {'frequency': '433.92', 'device': '0'})
assert result['status'] == 'started'
assert 'sensor' in mode_manager.running_modes
# Stop
result = mode_manager.stop_mode('sensor')
assert result['status'] == 'stopped'
assert 'sensor' not in mode_manager.running_modes
def test_adsb_mode_lifecycle(self, mode_manager, mock_subprocess, mock_tools):
"""ADS-B mode should start and stop cleanly."""
mock_popen, mock_proc = mock_subprocess
# Mock socket for SBS connection check
with patch('socket.socket') as mock_socket:
mock_sock = MagicMock()
mock_sock.connect_ex.return_value = 1 # Port not in use
mock_socket.return_value = mock_sock
result = mode_manager.start_mode('adsb', {'device': '0', 'gain': '40'})
# May fail due to SBS port check, but shouldn't crash
assert result['status'] in ['started', 'error']
def test_pager_mode_lifecycle(self, mode_manager, mock_subprocess, mock_tools):
"""Pager mode should start and stop cleanly."""
mock_popen, mock_proc = mock_subprocess
result = mode_manager.start_mode('pager', {
'frequency': '929.6125',
'protocols': ['POCSAG512', 'POCSAG1200']
})
assert result['status'] == 'started'
assert 'pager' in mode_manager.running_modes
result = mode_manager.stop_mode('pager')
assert result['status'] == 'stopped'
def test_wifi_mode_lifecycle(self, mode_manager, mock_subprocess, mock_tools):
"""WiFi mode should start and stop cleanly."""
mock_popen, mock_proc = mock_subprocess
# Mock glob for CSV file detection
with patch('glob.glob', return_value=[]):
with patch('tempfile.mkdtemp', return_value='/tmp/test'):
result = mode_manager.start_mode('wifi', {
'interface': 'wlan0',
'scan_type': 'quick'
})
# Quick scan returns data directly
assert result['status'] in ['started', 'error', 'success']
def test_bluetooth_mode_lifecycle(self, mode_manager, mock_subprocess, mock_tools):
"""Bluetooth mode should start and stop cleanly."""
mock_popen, mock_proc = mock_subprocess
result = mode_manager.start_mode('bluetooth', {'adapter': 'hci0'})
assert result['status'] == 'started'
assert 'bluetooth' in mode_manager.running_modes
# Give thread time to start
time.sleep(0.1)
result = mode_manager.stop_mode('bluetooth')
assert result['status'] == 'stopped'
def test_satellite_mode_lifecycle(self, mode_manager):
"""Satellite mode should work without SDR."""
# Satellite mode is computational only
result = mode_manager.start_mode('satellite', {
'lat': 33.5,
'lon': -82.1,
'min_elevation': 10
})
assert result['status'] in ['started', 'error'] # May fail if skyfield not installed
def test_tscm_mode_lifecycle(self, mode_manager, mock_subprocess, mock_tools):
"""TSCM mode should start and stop cleanly."""
mock_popen, mock_proc = mock_subprocess
result = mode_manager.start_mode('tscm', {
'wifi': True,
'bluetooth': True,
'rf': False
})
assert result['status'] == 'started'
result = mode_manager.stop_mode('tscm')
assert result['status'] == 'stopped'
# =============================================================================
# SDR Conflict Detection Tests
# =============================================================================
class TestSDRConflictDetection:
"""Test SDR device conflict detection."""
def test_same_device_conflict(self, mode_manager, mock_subprocess, mock_tools):
"""Starting two SDR modes on same device should fail."""
mock_popen, mock_proc = mock_subprocess
# Start sensor on device 0
result1 = mode_manager.start_mode('sensor', {'device': '0'})
assert result1['status'] == 'started'
# Try to start pager on device 0 - should fail
result2 = mode_manager.start_mode('pager', {'device': '0'})
assert result2['status'] == 'error'
assert 'in use' in result2['message'].lower()
def test_different_device_no_conflict(self, mode_manager, mock_subprocess, mock_tools):
"""Starting SDR modes on different devices should work."""
mock_popen, mock_proc = mock_subprocess
# Start sensor on device 0
result1 = mode_manager.start_mode('sensor', {'device': '0'})
assert result1['status'] == 'started'
# Start pager on device 1 - should work
result2 = mode_manager.start_mode('pager', {'device': '1'})
assert result2['status'] == 'started'
assert len(mode_manager.running_modes) == 2
def test_non_sdr_modes_no_conflict(self, mode_manager, mock_subprocess, mock_tools):
"""Non-SDR modes should not conflict with SDR modes."""
mock_popen, mock_proc = mock_subprocess
# Start sensor (SDR)
result1 = mode_manager.start_mode('sensor', {'device': '0'})
assert result1['status'] == 'started'
# Start bluetooth (non-SDR) - should work
result2 = mode_manager.start_mode('bluetooth', {'adapter': 'hci0'})
assert result2['status'] == 'started'
assert len(mode_manager.running_modes) == 2
def test_get_sdr_in_use(self, mode_manager, mock_subprocess, mock_tools):
"""get_sdr_in_use should return correct mode."""
mock_popen, mock_proc = mock_subprocess
# No SDR in use initially
assert mode_manager.get_sdr_in_use(0) is None
# Start sensor
mode_manager.start_mode('sensor', {'device': '0'})
# Device 0 now in use by sensor
assert mode_manager.get_sdr_in_use(0) == 'sensor'
assert mode_manager.get_sdr_in_use(1) is None
# =============================================================================
# Process Verification Tests
# =============================================================================
class TestProcessVerification:
"""Test process startup verification."""
def test_immediate_process_exit_detected(self, mode_manager, mock_tools):
"""Process that exits immediately should return error."""
with patch('subprocess.Popen') as mock_popen:
mock_proc = MagicMock()
mock_proc.poll.return_value = 1 # Process exited
mock_proc.stderr.read.return_value = b'device busy'
mock_popen.return_value = mock_proc
result = mode_manager.start_mode('sensor', {'device': '0'})
assert result['status'] == 'error'
assert 'sensor' not in mode_manager.running_modes
def test_running_process_accepted(self, mode_manager, mock_subprocess, mock_tools):
"""Process that stays running should be accepted."""
mock_popen, mock_proc = mock_subprocess
mock_proc.poll.return_value = None # Still running
result = mode_manager.start_mode('sensor', {'device': '0'})
assert result['status'] == 'started'
assert 'sensor' in mode_manager.running_modes
def test_error_message_from_stderr(self, mode_manager, mock_tools):
"""Error message should include stderr output."""
with patch('subprocess.Popen') as mock_popen:
mock_proc = MagicMock()
mock_proc.poll.return_value = 1
mock_proc.stderr.read.return_value = b'usb_claim_interface error -6'
mock_popen.return_value = mock_proc
result = mode_manager.start_mode('sensor', {'device': '0'})
assert result['status'] == 'error'
assert 'usb_claim_interface' in result['message'] or 'failed' in result['message'].lower()
# =============================================================================
# Data Snapshot Tests
# =============================================================================
class TestDataSnapshots:
"""Test data snapshot operations."""
def test_get_mode_data_empty(self, mode_manager):
"""get_mode_data for non-running mode should return empty."""
result = mode_manager.get_mode_data('sensor')
assert result['mode'] == 'sensor'
# Mode not running - should have empty data or 'running' field
assert result.get('running') is False or result.get('data') == [] or 'status' in result
def test_get_mode_data_running(self, mode_manager, mock_subprocess, mock_tools):
"""get_mode_data for running mode should return status."""
mock_popen, mock_proc = mock_subprocess
mode_manager.start_mode('sensor', {'device': '0'})
result = mode_manager.get_mode_data('sensor')
assert result['mode'] == 'sensor'
# Mode is running - should indicate running status
assert result.get('running') is True or 'data' in result or 'status' in result
def test_data_queue_limit(self, mode_manager):
"""Data queues should respect max size limits."""
import queue
# Manually test queue limit
test_queue = queue.Queue(maxsize=100)
for i in range(150):
if test_queue.full():
test_queue.get_nowait() # Remove old item
test_queue.put_nowait({'index': i})
assert test_queue.qsize() <= 100
# =============================================================================
# Mode Status Tests
# =============================================================================
class TestModeStatus:
"""Test mode status reporting."""
def test_status_includes_all_modes(self, mode_manager):
"""Status should include all running modes."""
status = mode_manager.get_status()
assert 'running_modes' in status
assert 'running_modes_detail' in status
assert isinstance(status['running_modes'], list)
def test_running_modes_detail_includes_device(self, mode_manager, mock_subprocess, mock_tools):
"""Running modes detail should include device info."""
mock_popen, mock_proc = mock_subprocess
mode_manager.start_mode('sensor', {'device': '0'})
status = mode_manager.get_status()
assert 'sensor' in status['running_modes_detail']
detail = status['running_modes_detail']['sensor']
assert 'device' in detail or 'params' in detail
# =============================================================================
# Error Handling Tests
# =============================================================================
class TestErrorHandling:
"""Test error handling scenarios."""
def test_missing_tool_returns_error(self, mode_manager):
"""Mode should fail gracefully if required tool is missing."""
with patch('shutil.which', return_value=None):
result = mode_manager.start_mode('sensor', {'device': '0'})
assert result['status'] == 'error'
# Error message may vary - check for common patterns
msg = result['message'].lower()
assert 'not found' in msg or 'not available' in msg or 'missing' in msg
def test_invalid_mode_returns_error(self, mode_manager):
"""Invalid mode name should return error."""
result = mode_manager.start_mode('invalid_mode', {})
assert result['status'] == 'error'
def test_double_start_returns_already_running(self, mode_manager, mock_subprocess, mock_tools):
"""Starting already-running mode should return appropriate status."""
mock_popen, mock_proc = mock_subprocess
mode_manager.start_mode('sensor', {'device': '0'})
result = mode_manager.start_mode('sensor', {'device': '0'})
assert result['status'] in ['already_running', 'error']
def test_stop_non_running_mode(self, mode_manager):
"""Stopping non-running mode should handle gracefully."""
result = mode_manager.stop_mode('sensor')
assert result['status'] in ['stopped', 'not_running']
# =============================================================================
# Cleanup Tests
# =============================================================================
class TestCleanup:
"""Test mode cleanup on stop."""
def test_process_terminated_on_stop(self, mode_manager, mock_subprocess, mock_tools):
"""Processes should be terminated when mode is stopped."""
mock_popen, mock_proc = mock_subprocess
mode_manager.start_mode('sensor', {'device': '0'})
mode_manager.stop_mode('sensor')
# Verify terminate was called
mock_proc.terminate.assert_called()
def test_threads_stopped_on_stop(self, mode_manager, mock_subprocess, mock_tools):
"""Output threads should be stopped when mode is stopped."""
mock_popen, mock_proc = mock_subprocess
mode_manager.start_mode('bluetooth', {'adapter': 'hci0'})
time.sleep(0.1) # Let thread start
mode_manager.stop_mode('bluetooth')
# Thread should no longer be in output_threads or should be stopped
assert 'bluetooth' not in mode_manager.output_threads or \
not mode_manager.output_threads['bluetooth'].is_alive()
# =============================================================================
# Multi-Mode Tests
# =============================================================================
class TestMultiMode:
"""Test multiple modes running simultaneously."""
def test_multiple_non_sdr_modes(self, mode_manager, mock_subprocess, mock_tools):
"""Multiple non-SDR modes should run simultaneously."""
mock_popen, mock_proc = mock_subprocess
result1 = mode_manager.start_mode('bluetooth', {'adapter': 'hci0'})
result2 = mode_manager.start_mode('tscm', {'wifi': True, 'bluetooth': False})
assert result1['status'] == 'started'
assert result2['status'] == 'started'
assert len(mode_manager.running_modes) == 2
def test_stop_all_modes(self, mode_manager, mock_subprocess, mock_tools):
"""All modes should stop cleanly."""
mock_popen, mock_proc = mock_subprocess
mode_manager.start_mode('sensor', {'device': '0'})
mode_manager.start_mode('bluetooth', {'adapter': 'hci0'})
# Stop all
for mode in list(mode_manager.running_modes.keys()):
mode_manager.stop_mode(mode)
assert len(mode_manager.running_modes) == 0
# =============================================================================
# GPS Integration Tests
# =============================================================================
class TestGPSIntegration:
"""Test GPS coordinate integration."""
def test_status_includes_gps_flag(self, mode_manager):
"""Status should indicate GPS availability."""
status = mode_manager.get_status()
assert 'gps' in status
def test_mode_start_includes_gps_flag(self, mode_manager, mock_subprocess, mock_tools):
"""Mode start response should include GPS status."""
mock_popen, mock_proc = mock_subprocess
result = mode_manager.start_mode('sensor', {'device': '0'})
if result['status'] == 'started':
assert 'gps_enabled' in result
# =============================================================================
# Run Tests
# =============================================================================
if __name__ == '__main__':
pytest.main([__file__, '-v'])