chore: commit all changes and remove large IQ captures from tracking

Add .gitignore entry for data/subghz/captures/ to prevent large
IQ recording files from being committed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-12 23:30:37 +00:00
parent 4639146f05
commit 7c3ec9e920
46 changed files with 10792 additions and 462 deletions
+78 -4
View File
@@ -1,7 +1,9 @@
"""Tests for the DMR / Digital Voice decoding module."""
import queue
from unittest.mock import patch, MagicMock
import pytest
import routes.dmr as dmr_module
from routes.dmr import parse_dsd_output, _DSD_PROTOCOL_FLAGS, _DSD_FME_PROTOCOL_FLAGS, _DSD_FME_MODULATION
@@ -132,9 +134,9 @@ def test_dsd_fme_flags_differ_from_classic():
def test_dsd_fme_protocol_flags_known_values():
"""dsd-fme flags use its own flag names (NOT classic DSD mappings)."""
assert _DSD_FME_PROTOCOL_FLAGS['auto'] == ['-ft'] # XDMA
assert _DSD_FME_PROTOCOL_FLAGS['auto'] == ['-fa'] # Broad auto
assert _DSD_FME_PROTOCOL_FLAGS['dmr'] == ['-fs'] # Simplex (-fd is D-STAR!)
assert _DSD_FME_PROTOCOL_FLAGS['p25'] == ['-f1'] # NOT -fp (ProVoice in fme)
assert _DSD_FME_PROTOCOL_FLAGS['p25'] == ['-ft'] # P25 P1/P2 coverage
assert _DSD_FME_PROTOCOL_FLAGS['nxdn'] == ['-fn']
assert _DSD_FME_PROTOCOL_FLAGS['dstar'] == ['-fd'] # -fd is D-STAR in dsd-fme
assert _DSD_FME_PROTOCOL_FLAGS['provoice'] == ['-fp'] # NOT -fv
@@ -153,9 +155,9 @@ def test_dsd_protocol_flags_known_values():
def test_dsd_fme_modulation_hints():
"""C4FM modulation hints should be set for C4FM protocols."""
assert _DSD_FME_MODULATION['dmr'] == ['-mc']
assert _DSD_FME_MODULATION['p25'] == ['-mc']
assert _DSD_FME_MODULATION['nxdn'] == ['-mc']
# D-Star and ProVoice should not have forced modulation
# P25, D-Star and ProVoice should not have forced modulation
assert 'p25' not in _DSD_FME_MODULATION
assert 'dstar' not in _DSD_FME_MODULATION
assert 'provoice' not in _DSD_FME_MODULATION
@@ -172,6 +174,40 @@ def auth_client(client):
return client
@pytest.fixture(autouse=True)
def reset_dmr_globals():
"""Reset DMR globals before/after each test to avoid cross-test bleed."""
dmr_module.dmr_rtl_process = None
dmr_module.dmr_dsd_process = None
dmr_module.dmr_thread = None
dmr_module.dmr_running = False
dmr_module.dmr_has_audio = False
dmr_module.dmr_active_device = None
with dmr_module._ffmpeg_sinks_lock:
dmr_module._ffmpeg_sinks.clear()
try:
while True:
dmr_module.dmr_queue.get_nowait()
except queue.Empty:
pass
yield
dmr_module.dmr_rtl_process = None
dmr_module.dmr_dsd_process = None
dmr_module.dmr_thread = None
dmr_module.dmr_running = False
dmr_module.dmr_has_audio = False
dmr_module.dmr_active_device = None
with dmr_module._ffmpeg_sinks_lock:
dmr_module._ffmpeg_sinks.clear()
try:
while True:
dmr_module.dmr_queue.get_nowait()
except queue.Empty:
pass
def test_dmr_tools(auth_client):
"""Tools endpoint should return availability info."""
resp = auth_client.get('/dmr/tools')
@@ -235,3 +271,41 @@ def test_dmr_stream_mimetype(auth_client):
"""Stream should return event-stream content type."""
resp = auth_client.get('/dmr/stream')
assert resp.content_type.startswith('text/event-stream')
def test_dmr_start_exception_cleans_up_resources(auth_client):
"""If startup fails after rtl_fm launch, process/device state should be reset."""
rtl_proc = MagicMock()
rtl_proc.poll.return_value = None
rtl_proc.wait.return_value = 0
rtl_proc.stdout = MagicMock()
rtl_proc.stderr = MagicMock()
builder = MagicMock()
builder.build_fm_demod_command.return_value = ['rtl_fm', '-f', '462.5625M']
with patch('routes.dmr.find_dsd', return_value=('/usr/bin/dsd', False)), \
patch('routes.dmr.find_rtl_fm', return_value='/usr/bin/rtl_fm'), \
patch('routes.dmr.find_ffmpeg', return_value=None), \
patch('routes.dmr.SDRFactory.create_default_device', return_value=MagicMock()), \
patch('routes.dmr.SDRFactory.get_builder', return_value=builder), \
patch('routes.dmr.app_module.claim_sdr_device', return_value=None), \
patch('routes.dmr.app_module.release_sdr_device') as release_mock, \
patch('routes.dmr.register_process') as register_mock, \
patch('routes.dmr.unregister_process') as unregister_mock, \
patch('routes.dmr.subprocess.Popen', side_effect=[rtl_proc, RuntimeError('dsd launch failed')]):
resp = auth_client.post('/dmr/start', json={
'frequency': 462.5625,
'protocol': 'auto',
'device': 0,
})
assert resp.status_code == 500
assert 'dsd launch failed' in resp.get_json()['message']
register_mock.assert_called_once_with(rtl_proc)
rtl_proc.terminate.assert_called_once()
unregister_mock.assert_called_once_with(rtl_proc)
release_mock.assert_called_once_with(0)
assert dmr_module.dmr_running is False
assert dmr_module.dmr_rtl_process is None
assert dmr_module.dmr_dsd_process is None
+20 -10
View File
@@ -46,20 +46,30 @@ class TestHealthEndpoint:
assert 'processes' in data
assert 'data' in data
def test_health_process_status(self, client):
"""Test health endpoint reports process status."""
response = client.get('/health')
data = json.loads(response.data)
def test_health_process_status(self, client):
"""Test health endpoint reports process status."""
response = client.get('/health')
data = json.loads(response.data)
processes = data['processes']
assert 'pager' in processes
assert 'sensor' in processes
assert 'adsb' in processes
assert 'wifi' in processes
assert 'bluetooth' in processes
class TestDevicesEndpoint:
assert 'adsb' in processes
assert 'wifi' in processes
assert 'bluetooth' in processes
def test_health_reports_dmr_route_process(self, client):
"""Health should reflect DMR route module state (not stale app globals)."""
mock_proc = MagicMock()
mock_proc.poll.return_value = None
with patch('routes.dmr.dmr_running', True), \
patch('routes.dmr.dmr_dsd_process', mock_proc):
response = client.get('/health')
data = json.loads(response.data)
assert data['processes']['dmr'] is True
class TestDevicesEndpoint:
"""Tests for devices endpoint."""
def test_get_devices(self, client):
+38
View File
@@ -0,0 +1,38 @@
"""Tests for rtl_fm modulation token mapping."""
from routes.listening_post import _rtl_fm_demod_mode as listening_post_rtl_mode
from utils.sdr.base import SDRDevice, SDRType
from utils.sdr.rtlsdr import RTLSDRCommandBuilder, _rtl_fm_demod_mode as builder_rtl_mode
def _dummy_rtlsdr_device() -> SDRDevice:
return SDRDevice(
sdr_type=SDRType.RTL_SDR,
index=0,
name='RTL-SDR',
serial='00000001',
driver='rtlsdr',
capabilities=RTLSDRCommandBuilder.CAPABILITIES,
)
def test_rtl_fm_modulation_maps_wfm_to_wbfm() -> None:
assert listening_post_rtl_mode('wfm') == 'wbfm'
assert builder_rtl_mode('wfm') == 'wbfm'
def test_rtl_fm_modulation_keeps_other_modes() -> None:
assert listening_post_rtl_mode('fm') == 'fm'
assert builder_rtl_mode('am') == 'am'
def test_rtlsdr_builder_uses_wbfm_token_for_wfm() -> None:
builder = RTLSDRCommandBuilder()
cmd = builder.build_fm_demod_command(
device=_dummy_rtlsdr_device(),
frequency_mhz=98.1,
modulation='wfm',
)
mode_index = cmd.index('-M')
assert cmd[mode_index + 1] == 'wbfm'
+608
View File
@@ -0,0 +1,608 @@
"""Tests for SubGhzManager utility module."""
from __future__ import annotations
import json
import os
import subprocess
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from utils.subghz import SubGhzManager, SubGhzCapture
@pytest.fixture
def tmp_data_dir(tmp_path):
"""Create a temporary data directory for SubGhz captures."""
data_dir = tmp_path / 'subghz'
data_dir.mkdir()
(data_dir / 'captures').mkdir()
return data_dir
@pytest.fixture
def manager(tmp_data_dir):
"""Create a SubGhzManager with temp directory."""
return SubGhzManager(data_dir=tmp_data_dir)
class TestSubGhzManagerInit:
def test_creates_data_dirs(self, tmp_path):
data_dir = tmp_path / 'new_subghz'
mgr = SubGhzManager(data_dir=data_dir)
assert (data_dir / 'captures').is_dir()
def test_active_mode_idle(self, manager):
assert manager.active_mode == 'idle'
def test_get_status_idle(self, manager):
status = manager.get_status()
assert status['mode'] == 'idle'
class TestToolDetection:
def test_check_hackrf_found(self, manager):
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'):
assert manager.check_hackrf() is True
def test_check_hackrf_not_found(self, manager):
with patch('shutil.which', return_value=None):
manager._hackrf_available = None # reset cache
assert manager.check_hackrf() is False
def test_check_rtl433_found(self, manager):
with patch('shutil.which', return_value='/usr/bin/rtl_433'):
assert manager.check_rtl433() is True
def test_check_sweep_found(self, manager):
with patch('shutil.which', return_value='/usr/bin/hackrf_sweep'):
assert manager.check_sweep() is True
class TestReceive:
def test_start_receive_no_hackrf(self, manager):
with patch('shutil.which', return_value=None):
manager._hackrf_available = None
result = manager.start_receive(frequency_hz=433920000)
assert result['status'] == 'error'
assert 'not found' in result['message']
def test_start_receive_success(self, manager):
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_proc.stderr = MagicMock()
mock_proc.stderr.readline = MagicMock(return_value=b'')
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch('subprocess.Popen', return_value=mock_proc), \
patch.object(manager, 'check_hackrf_device', return_value=True), \
patch('utils.subghz.register_process'):
manager._hackrf_available = None
result = manager.start_receive(
frequency_hz=433920000,
sample_rate=2000000,
lna_gain=32,
vga_gain=20,
)
assert result['status'] == 'started'
assert result['frequency_hz'] == 433920000
assert manager.active_mode == 'rx'
def test_start_receive_already_running(self, manager):
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
result = manager.start_receive(frequency_hz=433920000)
assert result['status'] == 'error'
assert 'Already running' in result['message']
def test_stop_receive_not_running(self, manager):
result = manager.stop_receive()
assert result['status'] == 'not_running'
def test_stop_receive_creates_metadata(self, manager, tmp_data_dir):
# Create a fake IQ file
iq_file = tmp_data_dir / 'captures' / 'test.iq'
iq_file.write_bytes(b'\x00' * 1024)
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
manager._rx_file = iq_file
manager._rx_frequency_hz = 433920000
manager._rx_sample_rate = 2000000
manager._rx_lna_gain = 32
manager._rx_vga_gain = 20
manager._rx_start_time = 1000.0
manager._rx_bursts = [{'start_seconds': 1.23, 'duration_seconds': 0.15, 'peak_level': 42}]
with patch('utils.subghz.safe_terminate'), \
patch('time.time', return_value=1005.0):
result = manager.stop_receive()
assert result['status'] == 'stopped'
assert 'capture' in result
assert result['capture']['frequency_hz'] == 433920000
# Verify JSON sidecar was written
meta_path = iq_file.with_suffix('.json')
assert meta_path.exists()
meta = json.loads(meta_path.read_text())
assert meta['frequency_hz'] == 433920000
assert isinstance(meta.get('bursts'), list)
assert meta['bursts'][0]['peak_level'] == 42
class TestTxSafety:
def test_validate_tx_frequency_ism_433(self):
result = SubGhzManager.validate_tx_frequency(433920000)
assert result is None # Valid
def test_validate_tx_frequency_ism_315(self):
result = SubGhzManager.validate_tx_frequency(315000000)
assert result is None
def test_validate_tx_frequency_ism_915(self):
result = SubGhzManager.validate_tx_frequency(915000000)
assert result is None
def test_validate_tx_frequency_out_of_band(self):
result = SubGhzManager.validate_tx_frequency(100000000) # 100 MHz
assert result is not None
assert 'outside allowed TX bands' in result
def test_validate_tx_frequency_between_bands(self):
result = SubGhzManager.validate_tx_frequency(500000000) # 500 MHz
assert result is not None
def test_transmit_no_hackrf(self, manager):
with patch('shutil.which', return_value=None):
manager._hackrf_available = None
result = manager.transmit(capture_id='abc123')
assert result['status'] == 'error'
def test_transmit_capture_not_found(self, manager):
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True):
manager._hackrf_available = None
result = manager.transmit(capture_id='nonexistent')
assert result['status'] == 'error'
assert 'not found' in result['message']
def test_transmit_out_of_band_rejected(self, manager, tmp_data_dir):
# Create a capture with out-of-band frequency
meta = {
'id': 'test123',
'filename': 'test.iq',
'frequency_hz': 100000000, # 100 MHz - out of ISM
'sample_rate': 2000000,
'lna_gain': 32,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
}
meta_path = tmp_data_dir / 'captures' / 'test.json'
meta_path.write_text(json.dumps(meta))
(tmp_data_dir / 'captures' / 'test.iq').write_bytes(b'\x00' * 100)
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True):
manager._hackrf_available = None
result = manager.transmit(capture_id='test123')
assert result['status'] == 'error'
assert 'outside allowed TX bands' in result['message']
def test_transmit_already_running(self, manager):
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
result = manager.transmit(capture_id='test123')
assert result['status'] == 'error'
assert 'Already running' in result['message']
def test_transmit_segment_extracts_range(self, manager, tmp_data_dir):
meta = {
'id': 'seg001',
'filename': 'seg.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 1.0,
'size_bytes': 2000,
}
(tmp_data_dir / 'captures' / 'seg.json').write_text(json.dumps(meta))
(tmp_data_dir / 'captures' / 'seg.iq').write_bytes(bytes(range(200)) * 10)
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_timer = MagicMock()
mock_timer.start = MagicMock()
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True), \
patch('subprocess.Popen', return_value=mock_proc), \
patch('utils.subghz.register_process'), \
patch('threading.Timer', return_value=mock_timer), \
patch('threading.Thread') as mock_thread_cls:
mock_thread = MagicMock()
mock_thread.start = MagicMock()
mock_thread_cls.return_value = mock_thread
manager._hackrf_available = None
result = manager.transmit(
capture_id='seg001',
start_seconds=0.2,
duration_seconds=0.3,
)
assert result['status'] == 'transmitting'
assert result['segment'] is not None
assert result['segment']['duration_seconds'] == pytest.approx(0.3, abs=0.01)
assert manager._tx_temp_file is not None
assert manager._tx_temp_file.exists()
class TestCaptureLibrary:
def test_list_captures_empty(self, manager):
captures = manager.list_captures()
assert captures == []
def test_list_captures_with_data(self, manager, tmp_data_dir):
meta = {
'id': 'cap001',
'filename': 'test.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'lna_gain': 32,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 5.0,
'size_bytes': 1024,
'label': 'test capture',
}
(tmp_data_dir / 'captures' / 'test.json').write_text(json.dumps(meta))
captures = manager.list_captures()
assert len(captures) == 1
assert captures[0].capture_id == 'cap001'
assert captures[0].label == 'test capture'
def test_get_capture(self, manager, tmp_data_dir):
meta = {
'id': 'cap002',
'filename': 'test2.iq',
'frequency_hz': 315000000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
}
(tmp_data_dir / 'captures' / 'test2.json').write_text(json.dumps(meta))
cap = manager.get_capture('cap002')
assert cap is not None
assert cap.frequency_hz == 315000000
def test_get_capture_not_found(self, manager):
cap = manager.get_capture('nonexistent')
assert cap is None
def test_delete_capture(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'delete_me.iq'
meta_path = captures_dir / 'delete_me.json'
iq_path.write_bytes(b'\x00' * 100)
meta_path.write_text(json.dumps({
'id': 'del001',
'filename': 'delete_me.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
}))
assert manager.delete_capture('del001') is True
assert not iq_path.exists()
assert not meta_path.exists()
def test_delete_capture_not_found(self, manager):
assert manager.delete_capture('nonexistent') is False
def test_update_label(self, manager, tmp_data_dir):
meta = {
'id': 'lbl001',
'filename': 'label_test.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
'label': '',
}
meta_path = tmp_data_dir / 'captures' / 'label_test.json'
meta_path.write_text(json.dumps(meta))
assert manager.update_capture_label('lbl001', 'Garage Remote') is True
updated = json.loads(meta_path.read_text())
assert updated['label'] == 'Garage Remote'
assert updated['label_source'] == 'manual'
def test_update_label_not_found(self, manager):
assert manager.update_capture_label('nonexistent', 'test') is False
def test_get_capture_path(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'path_test.iq'
iq_path.write_bytes(b'\x00' * 100)
(captures_dir / 'path_test.json').write_text(json.dumps({
'id': 'pth001',
'filename': 'path_test.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
}))
path = manager.get_capture_path('pth001')
assert path is not None
assert path.name == 'path_test.iq'
def test_get_capture_path_not_found(self, manager):
assert manager.get_capture_path('nonexistent') is None
def test_trim_capture_manual_segment(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'trim_src.iq'
iq_path.write_bytes(bytes(range(200)) * 20) # 4000 bytes at 1000 sps => 2.0s
(captures_dir / 'trim_src.json').write_text(json.dumps({
'id': 'trim001',
'filename': 'trim_src.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 2.0,
'size_bytes': 4000,
'label': 'Weather Burst',
'bursts': [
{
'start_seconds': 0.55,
'duration_seconds': 0.2,
'peak_level': 67,
'fingerprint': 'abc123',
'modulation_hint': 'OOK/ASK',
'modulation_confidence': 0.9,
}
],
}))
result = manager.trim_capture(
capture_id='trim001',
start_seconds=0.5,
duration_seconds=0.4,
)
assert result['status'] == 'ok'
assert result['capture']['id'] != 'trim001'
assert result['capture']['size_bytes'] == 800
assert result['capture']['label'].endswith('(Trim)')
trimmed_iq = captures_dir / result['capture']['filename']
assert trimmed_iq.exists()
trimmed_meta = trimmed_iq.with_suffix('.json')
assert trimmed_meta.exists()
def test_trim_capture_auto_burst(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'auto_src.iq'
iq_path.write_bytes(bytes(range(100)) * 40) # 4000 bytes
(captures_dir / 'auto_src.json').write_text(json.dumps({
'id': 'trim002',
'filename': 'auto_src.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 2.0,
'size_bytes': 4000,
'bursts': [
{'start_seconds': 0.2, 'duration_seconds': 0.1, 'peak_level': 12},
{'start_seconds': 1.2, 'duration_seconds': 0.25, 'peak_level': 88},
],
}))
result = manager.trim_capture(capture_id='trim002')
assert result['status'] == 'ok'
assert result['segment']['auto_selected'] is True
assert result['capture']['duration_seconds'] > 0.25
def test_list_captures_groups_same_fingerprint(self, manager, tmp_data_dir):
cap_a = {
'id': 'grp001',
'filename': 'a.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
'dominant_fingerprint': 'deadbeefcafebabe',
}
cap_b = {
'id': 'grp002',
'filename': 'b.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:01:00Z',
'dominant_fingerprint': 'deadbeefcafebabe',
}
(tmp_data_dir / 'captures' / 'a.json').write_text(json.dumps(cap_a))
(tmp_data_dir / 'captures' / 'b.json').write_text(json.dumps(cap_b))
captures = manager.list_captures()
assert len(captures) == 2
assert all(c.fingerprint_group.startswith('SIG-') for c in captures)
assert all(c.fingerprint_group_size == 2 for c in captures)
class TestSweep:
def test_start_sweep_no_tool(self, manager):
with patch('shutil.which', return_value=None):
manager._sweep_available = None
result = manager.start_sweep()
assert result['status'] == 'error'
def test_start_sweep_success(self, manager):
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_proc.stdout = MagicMock()
with patch('shutil.which', return_value='/usr/bin/hackrf_sweep'), \
patch('subprocess.Popen', return_value=mock_proc), \
patch('utils.subghz.register_process'):
manager._sweep_available = None
result = manager.start_sweep(freq_start_mhz=300, freq_end_mhz=928)
assert result['status'] == 'started'
# Signal daemon threads to stop so they don't outlive the test
manager._sweep_running = False
def test_stop_sweep_not_running(self, manager):
result = manager.stop_sweep()
assert result['status'] == 'not_running'
class TestDecode:
def test_start_decode_no_hackrf(self, manager):
with patch('shutil.which', return_value=None):
manager._hackrf_available = None
manager._rtl433_available = None
result = manager.start_decode(frequency_hz=433920000)
assert result['status'] == 'error'
assert 'hackrf_transfer' in result['message']
def test_start_decode_no_rtl433(self, manager):
def which_side_effect(name):
if name == 'hackrf_transfer':
return '/usr/bin/hackrf_transfer'
return None
with patch('shutil.which', side_effect=which_side_effect):
manager._hackrf_available = None
manager._rtl433_available = None
result = manager.start_decode(frequency_hz=433920000)
assert result['status'] == 'error'
assert 'rtl_433' in result['message']
def test_start_decode_success(self, manager):
mock_hackrf_proc = MagicMock()
mock_hackrf_proc.poll.return_value = None
mock_hackrf_proc.stdout = MagicMock()
mock_hackrf_proc.stderr = MagicMock()
mock_hackrf_proc.stderr.readline = MagicMock(return_value=b'')
mock_rtl433_proc = MagicMock()
mock_rtl433_proc.poll.return_value = None
mock_rtl433_proc.stdout = MagicMock()
mock_rtl433_proc.stderr = MagicMock()
mock_rtl433_proc.stderr.readline = MagicMock(return_value=b'')
call_count = [0]
def popen_side_effect(*args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
return mock_hackrf_proc
return mock_rtl433_proc
with patch('shutil.which', return_value='/usr/bin/tool'), \
patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \
patch('utils.subghz.register_process'):
manager._hackrf_available = None
manager._rtl433_available = None
result = manager.start_decode(
frequency_hz=433920000,
sample_rate=2000000,
)
assert result['status'] == 'started'
assert result['frequency_hz'] == 433920000
assert manager.active_mode == 'decode'
# Two processes: hackrf_transfer + rtl_433
assert mock_popen.call_count == 2
# Verify hackrf_transfer command
hackrf_cmd = mock_popen.call_args_list[0][0][0]
assert hackrf_cmd[0] == 'hackrf_transfer'
assert '-r' in hackrf_cmd
# Verify rtl_433 command
rtl433_cmd = mock_popen.call_args_list[1][0][0]
assert rtl433_cmd[0] == 'rtl_433'
assert '-r' in rtl433_cmd
assert 'cs8:-' in rtl433_cmd
# Both processes tracked
assert manager._decode_hackrf_process is mock_hackrf_proc
assert manager._decode_process is mock_rtl433_proc
# Signal daemon threads to stop so they don't outlive the test
manager._decode_stop = True
def test_stop_decode_not_running(self, manager):
result = manager.stop_decode()
assert result['status'] == 'not_running'
def test_stop_decode_terminates_both(self, manager):
mock_hackrf = MagicMock()
mock_hackrf.poll.return_value = None
mock_rtl433 = MagicMock()
mock_rtl433.poll.return_value = None
manager._decode_hackrf_process = mock_hackrf
manager._decode_process = mock_rtl433
manager._decode_frequency_hz = 433920000
with patch('utils.subghz.safe_terminate') as mock_term, \
patch('utils.subghz.unregister_process'):
result = manager.stop_decode()
assert result['status'] == 'stopped'
assert manager._decode_hackrf_process is None
assert manager._decode_process is None
assert mock_term.call_count == 2
class TestStopAll:
def test_stop_all_clears_processes(self, manager):
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
with patch('utils.subghz.safe_terminate'):
manager.stop_all()
assert manager._rx_process is None
assert manager._decode_hackrf_process is None
assert manager._decode_process is None
assert manager._tx_process is None
assert manager._sweep_process is None
class TestSubGhzCapture:
def test_to_dict(self):
cap = SubGhzCapture(
capture_id='abc123',
filename='test.iq',
frequency_hz=433920000,
sample_rate=2000000,
lna_gain=32,
vga_gain=20,
timestamp='2026-01-01T00:00:00Z',
duration_seconds=5.0,
size_bytes=1024,
label='Test',
)
d = cap.to_dict()
assert d['id'] == 'abc123'
assert d['frequency_hz'] == 433920000
assert d['label'] == 'Test'
+433
View File
@@ -0,0 +1,433 @@
"""Tests for SubGHz transceiver routes."""
from __future__ import annotations
import json
from unittest.mock import patch, MagicMock
import pytest
from utils.subghz import SubGhzCapture
@pytest.fixture
def auth_client(client):
"""Client with logged-in session."""
with client.session_transaction() as sess:
sess['logged_in'] = True
return client
class TestSubGhzRoutes:
"""Tests for /subghz/ endpoints."""
def test_get_status(self, client, auth_client):
"""GET /subghz/status returns manager status."""
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.get_status.return_value = {
'mode': 'idle',
'hackrf_available': True,
'rtl433_available': True,
'sweep_available': True,
}
mock_get.return_value = mock_mgr
response = auth_client.get('/subghz/status')
assert response.status_code == 200
data = response.get_json()
assert data['mode'] == 'idle'
assert data['hackrf_available'] is True
def test_get_presets(self, client, auth_client):
"""GET /subghz/presets returns frequency presets."""
response = auth_client.get('/subghz/presets')
assert response.status_code == 200
data = response.get_json()
assert 'presets' in data
assert '433.92 MHz' in data['presets']
assert 'sample_rates' in data
# ------ RECEIVE ------
def test_start_receive_success(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.start_receive.return_value = {
'status': 'started',
'frequency_hz': 433920000,
'sample_rate': 2000000,
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/receive/start', json={
'frequency_hz': 433920000,
'sample_rate': 2000000,
'lna_gain': 32,
'vga_gain': 20,
})
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
def test_start_receive_missing_frequency(self, client, auth_client):
response = auth_client.post('/subghz/receive/start', json={})
assert response.status_code == 400
data = response.get_json()
assert data['status'] == 'error'
def test_start_receive_invalid_frequency(self, client, auth_client):
response = auth_client.post('/subghz/receive/start', json={
'frequency_hz': 'not_a_number',
})
assert response.status_code == 400
def test_stop_receive(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.stop_receive.return_value = {'status': 'stopped'}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/receive/stop')
assert response.status_code == 200
def test_start_receive_trigger_params(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.start_receive.return_value = {'status': 'started'}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/receive/start', json={
'frequency_hz': 433920000,
'trigger_enabled': True,
'trigger_pre_ms': 400,
'trigger_post_ms': 900,
})
assert response.status_code == 200
kwargs = mock_mgr.start_receive.call_args.kwargs
assert kwargs['trigger_enabled'] is True
assert kwargs['trigger_pre_ms'] == 400
assert kwargs['trigger_post_ms'] == 900
# ------ DECODE ------
def test_start_decode_success(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.start_decode.return_value = {
'status': 'started',
'frequency_hz': 433920000,
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/decode/start', json={
'frequency_hz': 433920000,
})
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
mock_mgr.start_decode.assert_called_once()
kwargs = mock_mgr.start_decode.call_args.kwargs
assert kwargs['decode_profile'] == 'weather'
def test_start_decode_profile_all(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.start_decode.return_value = {
'status': 'started',
'frequency_hz': 433920000,
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/decode/start', json={
'frequency_hz': 433920000,
'decode_profile': 'all',
})
assert response.status_code == 200
kwargs = mock_mgr.start_decode.call_args.kwargs
assert kwargs['decode_profile'] == 'all'
def test_start_decode_missing_freq(self, client, auth_client):
response = auth_client.post('/subghz/decode/start', json={})
assert response.status_code == 400
def test_stop_decode(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.stop_decode.return_value = {'status': 'stopped'}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/decode/stop')
assert response.status_code == 200
# ------ TRANSMIT ------
def test_transmit_missing_capture_id(self, client, auth_client):
response = auth_client.post('/subghz/transmit', json={})
assert response.status_code == 400
data = response.get_json()
assert 'capture_id is required' in data['message']
def test_transmit_invalid_capture_id(self, client, auth_client):
response = auth_client.post('/subghz/transmit', json={
'capture_id': '../../../etc/passwd',
})
assert response.status_code == 400
data = response.get_json()
assert 'Invalid' in data['message']
def test_transmit_success(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.transmit.return_value = {
'status': 'transmitting',
'capture_id': 'abc123',
'frequency_hz': 433920000,
'max_duration': 10,
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/transmit', json={
'capture_id': 'abc123',
'tx_gain': 20,
'max_duration': 10,
})
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'transmitting'
kwargs = mock_mgr.transmit.call_args.kwargs
assert kwargs['start_seconds'] is None
assert kwargs['duration_seconds'] is None
def test_transmit_segment_params(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.transmit.return_value = {
'status': 'transmitting',
'capture_id': 'abc123',
'frequency_hz': 433920000,
'max_duration': 10,
'segment': {'start_seconds': 0.1, 'duration_seconds': 0.4},
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/transmit', json={
'capture_id': 'abc123',
'tx_gain': 20,
'max_duration': 10,
'start_seconds': 0.1,
'duration_seconds': 0.4,
})
assert response.status_code == 200
kwargs = mock_mgr.transmit.call_args.kwargs
assert kwargs['start_seconds'] == 0.1
assert kwargs['duration_seconds'] == 0.4
def test_transmit_invalid_segment_param(self, client, auth_client):
response = auth_client.post('/subghz/transmit', json={
'capture_id': 'abc123',
'start_seconds': 'not-a-number',
})
assert response.status_code == 400
def test_stop_transmit(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.stop_transmit.return_value = {'status': 'stopped'}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/transmit/stop')
assert response.status_code == 200
# ------ SWEEP ------
def test_start_sweep_success(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.start_sweep.return_value = {
'status': 'started',
'freq_start_mhz': 300,
'freq_end_mhz': 928,
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/sweep/start', json={
'freq_start_mhz': 300,
'freq_end_mhz': 928,
})
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
def test_start_sweep_invalid_range(self, client, auth_client):
response = auth_client.post('/subghz/sweep/start', json={
'freq_start_mhz': 928,
'freq_end_mhz': 300, # start > end
})
assert response.status_code == 400
def test_stop_sweep(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.stop_sweep.return_value = {'status': 'stopped'}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/sweep/stop')
assert response.status_code == 200
# ------ CAPTURES ------
def test_list_captures_empty(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.list_captures.return_value = []
mock_get.return_value = mock_mgr
response = auth_client.get('/subghz/captures')
assert response.status_code == 200
data = response.get_json()
assert data['count'] == 0
assert data['captures'] == []
def test_list_captures_with_data(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
cap = SubGhzCapture(
capture_id='cap1',
filename='test.iq',
frequency_hz=433920000,
sample_rate=2000000,
lna_gain=32,
vga_gain=20,
timestamp='2026-01-01T00:00:00Z',
)
mock_mgr.list_captures.return_value = [cap]
mock_get.return_value = mock_mgr
response = auth_client.get('/subghz/captures')
assert response.status_code == 200
data = response.get_json()
assert data['count'] == 1
assert data['captures'][0]['id'] == 'cap1'
def test_get_capture(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
cap = SubGhzCapture(
capture_id='cap2',
filename='test2.iq',
frequency_hz=315000000,
sample_rate=2000000,
lna_gain=32,
vga_gain=20,
timestamp='2026-01-01T00:00:00Z',
)
mock_mgr.get_capture.return_value = cap
mock_get.return_value = mock_mgr
response = auth_client.get('/subghz/captures/cap2')
assert response.status_code == 200
data = response.get_json()
assert data['capture']['frequency_hz'] == 315000000
def test_get_capture_not_found(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.get_capture.return_value = None
mock_get.return_value = mock_mgr
response = auth_client.get('/subghz/captures/nonexistent')
assert response.status_code == 404
def test_get_capture_invalid_id(self, client, auth_client):
response = auth_client.get('/subghz/captures/bad-id!')
assert response.status_code == 400
def test_delete_capture(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.delete_capture.return_value = True
mock_get.return_value = mock_mgr
response = auth_client.delete('/subghz/captures/cap1')
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'deleted'
def test_trim_capture_success(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.trim_capture.return_value = {
'status': 'ok',
'capture': {
'id': 'trim_new',
'filename': 'trimmed.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
},
}
mock_get.return_value = mock_mgr
response = auth_client.post('/subghz/captures/cap1/trim', json={
'start_seconds': 0.1,
'duration_seconds': 0.3,
})
assert response.status_code == 200
kwargs = mock_mgr.trim_capture.call_args.kwargs
assert kwargs['capture_id'] == 'cap1'
assert kwargs['start_seconds'] == 0.1
assert kwargs['duration_seconds'] == 0.3
def test_trim_capture_invalid_param(self, client, auth_client):
response = auth_client.post('/subghz/captures/cap1/trim', json={
'start_seconds': 'bad',
})
assert response.status_code == 400
def test_delete_capture_not_found(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.delete_capture.return_value = False
mock_get.return_value = mock_mgr
response = auth_client.delete('/subghz/captures/nonexistent')
assert response.status_code == 404
def test_update_capture_label(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.update_capture_label.return_value = True
mock_get.return_value = mock_mgr
response = auth_client.patch('/subghz/captures/cap1', json={
'label': 'Garage Remote',
})
assert response.status_code == 200
data = response.get_json()
assert data['label'] == 'Garage Remote'
def test_update_capture_label_too_long(self, client, auth_client):
response = auth_client.patch('/subghz/captures/cap1', json={
'label': 'x' * 200,
})
assert response.status_code == 400
def test_update_capture_not_found(self, client, auth_client):
with patch('routes.subghz.get_subghz_manager') as mock_get:
mock_mgr = MagicMock()
mock_mgr.update_capture_label.return_value = False
mock_get.return_value = mock_mgr
response = auth_client.patch('/subghz/captures/nonexistent', json={
'label': 'test',
})
assert response.status_code == 404
# ------ SSE STREAM ------
def test_stream_endpoint(self, client, auth_client):
"""GET /subghz/stream returns SSE response."""
with patch('routes.subghz.sse_stream', return_value=iter([])):
response = auth_client.get('/subghz/stream')
assert response.status_code == 200
assert response.content_type.startswith('text/event-stream')