Add HackRF support to TSCM RF scan and misc improvements

TSCM RF scan now auto-detects HackRF via SDRFactory and uses
hackrf_sweep as an alternative to rtl_power. Also includes
improvements to listening post, rtlamr, weather satellite,
SubGHz, Meshtastic, SSTV, WeFax, and process monitor modules.

Fixes #154

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-25 20:58:57 +00:00
parent ee9356c358
commit ecdc060d81
12 changed files with 4630 additions and 4427 deletions
+505 -487
View File
File diff suppressed because it is too large Load Diff
+34 -21
View File
@@ -138,36 +138,34 @@ def start_rtlamr() -> Response:
output_format = data.get('format', 'json')
# Start rtl_tcp first
rtl_tcp_just_started = False
rtl_tcp_cmd_str = ''
with rtl_tcp_lock:
if not rtl_tcp_process:
logger.info("Starting rtl_tcp server...")
try:
rtl_tcp_cmd = ['rtl_tcp', '-a', '0.0.0.0']
# Add device index if not 0
if device and device != '0':
rtl_tcp_cmd.extend(['-d', str(device)])
# Add gain if not auto
if gain and gain != '0':
rtl_tcp_cmd.extend(['-g', str(gain)])
# Add PPM correction if not 0
if ppm and ppm != '0':
rtl_tcp_cmd.extend(['-p', str(ppm)])
rtl_tcp_process = subprocess.Popen(
rtl_tcp_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(rtl_tcp_process)
# Wait a moment for rtl_tcp to start
time.sleep(3)
logger.info(f"rtl_tcp started: {' '.join(rtl_tcp_cmd)}")
app_module.rtlamr_queue.put({'type': 'info', 'text': f'rtl_tcp: {" ".join(rtl_tcp_cmd)}'})
rtl_tcp_just_started = True
rtl_tcp_cmd_str = ' '.join(rtl_tcp_cmd)
except Exception as e:
logger.error(f"Failed to start rtl_tcp: {e}")
# Release SDR device on rtl_tcp failure
@@ -176,6 +174,12 @@ def start_rtlamr() -> Response:
rtlamr_active_device = None
return jsonify({'status': 'error', 'message': f'Failed to start rtl_tcp: {e}'}), 500
# Wait for rtl_tcp to start outside lock
if rtl_tcp_just_started:
time.sleep(3)
logger.info(f"rtl_tcp started: {rtl_tcp_cmd_str}")
app_module.rtlamr_queue.put({'type': 'info', 'text': f'rtl_tcp: {rtl_tcp_cmd_str}'})
# Build rtlamr command
cmd = [
'rtlamr',
@@ -258,25 +262,34 @@ def start_rtlamr() -> Response:
def stop_rtlamr() -> Response:
global rtl_tcp_process, rtlamr_active_device
# Grab process refs inside locks, clear state, then terminate outside
rtlamr_proc = None
with app_module.rtlamr_lock:
if app_module.rtlamr_process:
app_module.rtlamr_process.terminate()
try:
app_module.rtlamr_process.wait(timeout=2)
except subprocess.TimeoutExpired:
app_module.rtlamr_process.kill()
rtlamr_proc = app_module.rtlamr_process
app_module.rtlamr_process = None
if rtlamr_proc:
rtlamr_proc.terminate()
try:
rtlamr_proc.wait(timeout=2)
except subprocess.TimeoutExpired:
rtlamr_proc.kill()
# Also stop rtl_tcp
tcp_proc = None
with rtl_tcp_lock:
if rtl_tcp_process:
rtl_tcp_process.terminate()
try:
rtl_tcp_process.wait(timeout=2)
except subprocess.TimeoutExpired:
rtl_tcp_process.kill()
tcp_proc = rtl_tcp_process
rtl_tcp_process = None
logger.info("rtl_tcp stopped")
if tcp_proc:
tcp_proc.terminate()
try:
tcp_proc.wait(timeout=2)
except subprocess.TimeoutExpired:
tcp_proc.kill()
logger.info("rtl_tcp stopped")
# Release device from registry
if rtlamr_active_device is not None:
+71 -44
View File
@@ -1345,7 +1345,7 @@ def _scan_rf_signals(
sweep_ranges: list[dict] | None = None
) -> list[dict]:
"""
Scan for RF signals using SDR (rtl_power).
Scan for RF signals using SDR (rtl_power or hackrf_sweep).
Scans common surveillance frequency bands:
- 88-108 MHz: FM broadcast (potential FM bugs)
@@ -1375,39 +1375,50 @@ def _scan_rf_signals(
logger.info(f"Starting RF scan (device={sdr_device})")
# Detect available SDR devices and sweep tools
rtl_power_path = shutil.which('rtl_power')
if not rtl_power_path:
logger.warning("rtl_power not found in PATH, RF scanning unavailable")
hackrf_sweep_path = shutil.which('hackrf_sweep')
sdr_type = None
sweep_tool_path = None
try:
from utils.sdr import SDRFactory
from utils.sdr.base import SDRType
devices = SDRFactory.detect_devices()
rtlsdr_available = any(d.sdr_type == SDRType.RTL_SDR for d in devices)
hackrf_available = any(d.sdr_type == SDRType.HACKRF for d in devices)
except ImportError:
rtlsdr_available = False
hackrf_available = False
# Pick the best available SDR + sweep tool combo
if rtlsdr_available and rtl_power_path:
sdr_type = 'rtlsdr'
sweep_tool_path = rtl_power_path
logger.info(f"Using RTL-SDR with rtl_power at: {rtl_power_path}")
elif hackrf_available and hackrf_sweep_path:
sdr_type = 'hackrf'
sweep_tool_path = hackrf_sweep_path
logger.info(f"Using HackRF with hackrf_sweep at: {hackrf_sweep_path}")
elif rtl_power_path:
# Tool exists but no device detected — try anyway (detection may have failed)
sdr_type = 'rtlsdr'
sweep_tool_path = rtl_power_path
logger.info(f"No SDR detected but rtl_power found, attempting RTL-SDR scan")
elif hackrf_sweep_path:
sdr_type = 'hackrf'
sweep_tool_path = hackrf_sweep_path
logger.info(f"No SDR detected but hackrf_sweep found, attempting HackRF scan")
if not sweep_tool_path:
logger.warning("No supported sweep tool found (rtl_power or hackrf_sweep)")
_emit_event('rf_status', {
'status': 'error',
'message': 'rtl_power not installed. Install rtl-sdr package for RF scanning.',
'message': 'No SDR sweep tool installed. Install rtl-sdr (rtl_power) or HackRF (hackrf_sweep) for RF scanning.',
})
return signals
logger.info(f"Found rtl_power at: {rtl_power_path}")
# Test if RTL-SDR device is accessible
rtl_test_path = shutil.which('rtl_test')
if rtl_test_path:
try:
test_result = subprocess.run(
[rtl_test_path, '-t'],
capture_output=True,
text=True,
timeout=5
)
if 'No supported devices found' in test_result.stderr or test_result.returncode != 0:
logger.warning("No RTL-SDR device found")
_emit_event('rf_status', {
'status': 'error',
'message': 'No RTL-SDR device connected. Connect an RTL-SDR dongle for RF scanning.',
})
return signals
except subprocess.TimeoutExpired:
pass # Device might be busy, continue anyway
except Exception as e:
logger.debug(f"rtl_test check failed: {e}")
# Define frequency bands to scan (in Hz)
# Format: (start_freq, end_freq, bin_size, description)
scan_bands: list[tuple[int, int, int, str]] = []
@@ -1448,7 +1459,7 @@ def _scan_rf_signals(
try:
# Build device argument
device_arg = ['-d', str(sdr_device if sdr_device is not None else 0)]
device_idx = sdr_device if sdr_device is not None else 0
# Scan each band and look for strong signals
for start_freq, end_freq, bin_size, band_name in scan_bands:
@@ -1458,15 +1469,27 @@ def _scan_rf_signals(
logger.info(f"Scanning {band_name} ({start_freq/1e6:.1f}-{end_freq/1e6:.1f} MHz)")
try:
# Run rtl_power for a quick sweep of this band
cmd = [
rtl_power_path,
'-f', f'{start_freq}:{end_freq}:{bin_size}',
'-g', '40', # Gain
'-i', '1', # Integration interval (1 second)
'-1', # Single shot mode
'-c', '20%', # Crop 20% of edges
] + device_arg + [tmp_path]
# Build sweep command based on SDR type
if sdr_type == 'hackrf':
cmd = [
sweep_tool_path,
'-f', f'{int(start_freq / 1e6)}:{int(end_freq / 1e6)}',
'-w', str(bin_size),
'-1', # Single sweep
]
output_mode = 'stdout'
else:
cmd = [
sweep_tool_path,
'-f', f'{start_freq}:{end_freq}:{bin_size}',
'-g', '40', # Gain
'-i', '1', # Integration interval (1 second)
'-1', # Single shot mode
'-c', '20%', # Crop 20% of edges
'-d', str(device_idx),
tmp_path,
]
output_mode = 'file'
logger.debug(f"Running: {' '.join(cmd)}")
@@ -1478,9 +1501,14 @@ def _scan_rf_signals(
)
if result.returncode != 0:
logger.warning(f"rtl_power returned {result.returncode}: {result.stderr}")
logger.warning(f"{os.path.basename(sweep_tool_path)} returned {result.returncode}: {result.stderr}")
# Parse the CSV output
# For HackRF, write stdout CSV data to temp file for unified parsing
if output_mode == 'stdout' and result.stdout:
with open(tmp_path, 'w') as f:
f.write(result.stdout)
# Parse the CSV output (same format for both rtl_power and hackrf_sweep)
if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 0:
with open(tmp_path, 'r') as f:
for line in f:
@@ -1488,13 +1516,12 @@ def _scan_rf_signals(
if len(parts) >= 7:
try:
# CSV format: date, time, hz_low, hz_high, hz_step, samples, db_values...
hz_low = int(parts[2])
hz_high = int(parts[3])
hz_step = float(parts[4])
hz_low = int(parts[2].strip())
hz_high = int(parts[3].strip())
hz_step = float(parts[4].strip())
db_values = [float(x) for x in parts[6:] if x.strip()]
# Find peaks above noise floor
# RTL-SDR dongles have higher noise figures, so use permissive thresholds
noise_floor = sum(db_values) / len(db_values) if db_values else -100
threshold = noise_floor + 6 # Signal must be 6dB above noise
File diff suppressed because it is too large Load Diff
+214 -188
View File
@@ -76,12 +76,12 @@ class TestReceive:
mock_proc.stderr = MagicMock()
mock_proc.stderr.readline = MagicMock(return_value=b'')
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch('subprocess.Popen', return_value=mock_proc), \
patch.object(manager, 'check_hackrf_device', return_value=True), \
patch('utils.subghz.register_process'):
manager._hackrf_available = None
result = manager.start_receive(
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch('subprocess.Popen', return_value=mock_proc), \
patch.object(manager, 'check_hackrf_device', return_value=True), \
patch('utils.subghz.register_process'):
manager._hackrf_available = None
result = manager.start_receive(
frequency_hz=433920000,
sample_rate=2000000,
lna_gain=32,
@@ -92,9 +92,14 @@ class TestReceive:
assert manager.active_mode == 'rx'
def test_start_receive_already_running(self, manager):
import time as _time
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
# Pre-lock device checks now run before active_mode guard
manager._hackrf_available = True
manager._hackrf_device_cache = True
manager._hackrf_device_cache_ts = _time.time()
result = manager.start_receive(frequency_hz=433920000)
assert result['status'] == 'error'
@@ -104,10 +109,10 @@ class TestReceive:
result = manager.stop_receive()
assert result['status'] == 'not_running'
def test_stop_receive_creates_metadata(self, manager, tmp_data_dir):
# Create a fake IQ file
iq_file = tmp_data_dir / 'captures' / 'test.iq'
iq_file.write_bytes(b'\x00' * 1024)
def test_stop_receive_creates_metadata(self, manager, tmp_data_dir):
# Create a fake IQ file
iq_file = tmp_data_dir / 'captures' / 'test.iq'
iq_file.write_bytes(b'\x00' * 1024)
mock_proc = MagicMock()
mock_proc.poll.return_value = None
@@ -115,10 +120,10 @@ class TestReceive:
manager._rx_file = iq_file
manager._rx_frequency_hz = 433920000
manager._rx_sample_rate = 2000000
manager._rx_lna_gain = 32
manager._rx_vga_gain = 20
manager._rx_start_time = 1000.0
manager._rx_bursts = [{'start_seconds': 1.23, 'duration_seconds': 0.15, 'peak_level': 42}]
manager._rx_lna_gain = 32
manager._rx_vga_gain = 20
manager._rx_start_time = 1000.0
manager._rx_bursts = [{'start_seconds': 1.23, 'duration_seconds': 0.15, 'peak_level': 42}]
with patch('utils.subghz.safe_terminate'), \
patch('time.time', return_value=1005.0):
@@ -131,10 +136,10 @@ class TestReceive:
# Verify JSON sidecar was written
meta_path = iq_file.with_suffix('.json')
assert meta_path.exists()
meta = json.loads(meta_path.read_text())
assert meta['frequency_hz'] == 433920000
assert isinstance(meta.get('bursts'), list)
assert meta['bursts'][0]['peak_level'] == 42
meta = json.loads(meta_path.read_text())
assert meta['frequency_hz'] == 433920000
assert isinstance(meta.get('bursts'), list)
assert meta['bursts'][0]['peak_level'] == 42
class TestTxSafety:
@@ -165,13 +170,13 @@ class TestTxSafety:
result = manager.transmit(capture_id='abc123')
assert result['status'] == 'error'
def test_transmit_capture_not_found(self, manager):
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True):
manager._hackrf_available = None
result = manager.transmit(capture_id='nonexistent')
assert result['status'] == 'error'
assert 'not found' in result['message']
def test_transmit_capture_not_found(self, manager):
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True):
manager._hackrf_available = None
result = manager.transmit(capture_id='nonexistent')
assert result['status'] == 'error'
assert 'not found' in result['message']
def test_transmit_out_of_band_rejected(self, manager, tmp_data_dir):
# Create a capture with out-of-band frequency
@@ -188,64 +193,79 @@ class TestTxSafety:
meta_path.write_text(json.dumps(meta))
(tmp_data_dir / 'captures' / 'test.iq').write_bytes(b'\x00' * 100)
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True):
manager._hackrf_available = None
result = manager.transmit(capture_id='test123')
assert result['status'] == 'error'
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True):
manager._hackrf_available = None
result = manager.transmit(capture_id='test123')
assert result['status'] == 'error'
assert 'outside allowed TX bands' in result['message']
def test_transmit_already_running(self, manager):
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
result = manager.transmit(capture_id='test123')
assert result['status'] == 'error'
assert 'Already running' in result['message']
def test_transmit_segment_extracts_range(self, manager, tmp_data_dir):
meta = {
'id': 'seg001',
'filename': 'seg.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 1.0,
'size_bytes': 2000,
}
(tmp_data_dir / 'captures' / 'seg.json').write_text(json.dumps(meta))
(tmp_data_dir / 'captures' / 'seg.iq').write_bytes(bytes(range(200)) * 10)
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_timer = MagicMock()
mock_timer.start = MagicMock()
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True), \
patch('subprocess.Popen', return_value=mock_proc), \
patch('utils.subghz.register_process'), \
patch('threading.Timer', return_value=mock_timer), \
patch('threading.Thread') as mock_thread_cls:
mock_thread = MagicMock()
mock_thread.start = MagicMock()
mock_thread_cls.return_value = mock_thread
manager._hackrf_available = None
result = manager.transmit(
capture_id='seg001',
start_seconds=0.2,
duration_seconds=0.3,
)
assert result['status'] == 'transmitting'
assert result['segment'] is not None
assert result['segment']['duration_seconds'] == pytest.approx(0.3, abs=0.01)
assert manager._tx_temp_file is not None
assert manager._tx_temp_file.exists()
def test_transmit_already_running(self, manager, tmp_data_dir):
import time as _time
mock_proc = MagicMock()
mock_proc.poll.return_value = None
manager._rx_process = mock_proc
# Pre-lock device checks now run before active_mode guard
manager._hackrf_available = True
manager._hackrf_device_cache = True
manager._hackrf_device_cache_ts = _time.time()
# Capture lookup also runs pre-lock now; provide a valid capture + IQ file
meta = {
'id': 'test123',
'filename': 'test.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2025-01-01T00:00:00',
}
(tmp_data_dir / 'captures' / 'test.json').write_text(json.dumps(meta))
(tmp_data_dir / 'captures' / 'test.iq').write_bytes(b'\x00' * 64)
result = manager.transmit(capture_id='test123')
assert result['status'] == 'error'
assert 'Already running' in result['message']
def test_transmit_segment_extracts_range(self, manager, tmp_data_dir):
meta = {
'id': 'seg001',
'filename': 'seg.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 1.0,
'size_bytes': 2000,
}
(tmp_data_dir / 'captures' / 'seg.json').write_text(json.dumps(meta))
(tmp_data_dir / 'captures' / 'seg.iq').write_bytes(bytes(range(200)) * 10)
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_timer = MagicMock()
mock_timer.start = MagicMock()
with patch('shutil.which', return_value='/usr/bin/hackrf_transfer'), \
patch.object(manager, 'check_hackrf_device', return_value=True), \
patch('subprocess.Popen', return_value=mock_proc), \
patch('utils.subghz.register_process'), \
patch('threading.Timer', return_value=mock_timer), \
patch('threading.Thread') as mock_thread_cls:
mock_thread = MagicMock()
mock_thread.start = MagicMock()
mock_thread_cls.return_value = mock_thread
manager._hackrf_available = None
result = manager.transmit(
capture_id='seg001',
start_seconds=0.2,
duration_seconds=0.3,
)
assert result['status'] == 'transmitting'
assert result['segment'] is not None
assert result['segment']['duration_seconds'] == pytest.approx(0.3, abs=0.01)
assert manager._tx_temp_file is not None
assert manager._tx_temp_file.exists()
class TestCaptureLibrary:
@@ -311,11 +331,11 @@ class TestCaptureLibrary:
def test_delete_capture_not_found(self, manager):
assert manager.delete_capture('nonexistent') is False
def test_update_label(self, manager, tmp_data_dir):
meta = {
'id': 'lbl001',
'filename': 'label_test.iq',
'frequency_hz': 433920000,
def test_update_label(self, manager, tmp_data_dir):
meta = {
'id': 'lbl001',
'filename': 'label_test.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
'label': '',
@@ -324,10 +344,10 @@ class TestCaptureLibrary:
meta_path.write_text(json.dumps(meta))
assert manager.update_capture_label('lbl001', 'Garage Remote') is True
updated = json.loads(meta_path.read_text())
assert updated['label'] == 'Garage Remote'
assert updated['label_source'] == 'manual'
updated = json.loads(meta_path.read_text())
assert updated['label'] == 'Garage Remote'
assert updated['label_source'] == 'manual'
def test_update_label_not_found(self, manager):
assert manager.update_capture_label('nonexistent', 'test') is False
@@ -348,100 +368,100 @@ class TestCaptureLibrary:
assert path is not None
assert path.name == 'path_test.iq'
def test_get_capture_path_not_found(self, manager):
assert manager.get_capture_path('nonexistent') is None
def test_trim_capture_manual_segment(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'trim_src.iq'
iq_path.write_bytes(bytes(range(200)) * 20) # 4000 bytes at 1000 sps => 2.0s
(captures_dir / 'trim_src.json').write_text(json.dumps({
'id': 'trim001',
'filename': 'trim_src.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 2.0,
'size_bytes': 4000,
'label': 'Weather Burst',
'bursts': [
{
'start_seconds': 0.55,
'duration_seconds': 0.2,
'peak_level': 67,
'fingerprint': 'abc123',
'modulation_hint': 'OOK/ASK',
'modulation_confidence': 0.9,
}
],
}))
result = manager.trim_capture(
capture_id='trim001',
start_seconds=0.5,
duration_seconds=0.4,
)
assert result['status'] == 'ok'
assert result['capture']['id'] != 'trim001'
assert result['capture']['size_bytes'] == 800
assert result['capture']['label'].endswith('(Trim)')
trimmed_iq = captures_dir / result['capture']['filename']
assert trimmed_iq.exists()
trimmed_meta = trimmed_iq.with_suffix('.json')
assert trimmed_meta.exists()
def test_trim_capture_auto_burst(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'auto_src.iq'
iq_path.write_bytes(bytes(range(100)) * 40) # 4000 bytes
(captures_dir / 'auto_src.json').write_text(json.dumps({
'id': 'trim002',
'filename': 'auto_src.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 2.0,
'size_bytes': 4000,
'bursts': [
{'start_seconds': 0.2, 'duration_seconds': 0.1, 'peak_level': 12},
{'start_seconds': 1.2, 'duration_seconds': 0.25, 'peak_level': 88},
],
}))
result = manager.trim_capture(capture_id='trim002')
assert result['status'] == 'ok'
assert result['segment']['auto_selected'] is True
assert result['capture']['duration_seconds'] > 0.25
def test_list_captures_groups_same_fingerprint(self, manager, tmp_data_dir):
cap_a = {
'id': 'grp001',
'filename': 'a.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
'dominant_fingerprint': 'deadbeefcafebabe',
}
cap_b = {
'id': 'grp002',
'filename': 'b.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:01:00Z',
'dominant_fingerprint': 'deadbeefcafebabe',
}
(tmp_data_dir / 'captures' / 'a.json').write_text(json.dumps(cap_a))
(tmp_data_dir / 'captures' / 'b.json').write_text(json.dumps(cap_b))
captures = manager.list_captures()
assert len(captures) == 2
assert all(c.fingerprint_group.startswith('SIG-') for c in captures)
assert all(c.fingerprint_group_size == 2 for c in captures)
def test_get_capture_path_not_found(self, manager):
assert manager.get_capture_path('nonexistent') is None
def test_trim_capture_manual_segment(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'trim_src.iq'
iq_path.write_bytes(bytes(range(200)) * 20) # 4000 bytes at 1000 sps => 2.0s
(captures_dir / 'trim_src.json').write_text(json.dumps({
'id': 'trim001',
'filename': 'trim_src.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 2.0,
'size_bytes': 4000,
'label': 'Weather Burst',
'bursts': [
{
'start_seconds': 0.55,
'duration_seconds': 0.2,
'peak_level': 67,
'fingerprint': 'abc123',
'modulation_hint': 'OOK/ASK',
'modulation_confidence': 0.9,
}
],
}))
result = manager.trim_capture(
capture_id='trim001',
start_seconds=0.5,
duration_seconds=0.4,
)
assert result['status'] == 'ok'
assert result['capture']['id'] != 'trim001'
assert result['capture']['size_bytes'] == 800
assert result['capture']['label'].endswith('(Trim)')
trimmed_iq = captures_dir / result['capture']['filename']
assert trimmed_iq.exists()
trimmed_meta = trimmed_iq.with_suffix('.json')
assert trimmed_meta.exists()
def test_trim_capture_auto_burst(self, manager, tmp_data_dir):
captures_dir = tmp_data_dir / 'captures'
iq_path = captures_dir / 'auto_src.iq'
iq_path.write_bytes(bytes(range(100)) * 40) # 4000 bytes
(captures_dir / 'auto_src.json').write_text(json.dumps({
'id': 'trim002',
'filename': 'auto_src.iq',
'frequency_hz': 433920000,
'sample_rate': 1000,
'lna_gain': 24,
'vga_gain': 20,
'timestamp': '2026-01-01T00:00:00Z',
'duration_seconds': 2.0,
'size_bytes': 4000,
'bursts': [
{'start_seconds': 0.2, 'duration_seconds': 0.1, 'peak_level': 12},
{'start_seconds': 1.2, 'duration_seconds': 0.25, 'peak_level': 88},
],
}))
result = manager.trim_capture(capture_id='trim002')
assert result['status'] == 'ok'
assert result['segment']['auto_selected'] is True
assert result['capture']['duration_seconds'] > 0.25
def test_list_captures_groups_same_fingerprint(self, manager, tmp_data_dir):
cap_a = {
'id': 'grp001',
'filename': 'a.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:00:00Z',
'dominant_fingerprint': 'deadbeefcafebabe',
}
cap_b = {
'id': 'grp002',
'filename': 'b.iq',
'frequency_hz': 433920000,
'sample_rate': 2000000,
'timestamp': '2026-01-01T00:01:00Z',
'dominant_fingerprint': 'deadbeefcafebabe',
}
(tmp_data_dir / 'captures' / 'a.json').write_text(json.dumps(cap_a))
(tmp_data_dir / 'captures' / 'b.json').write_text(json.dumps(cap_b))
captures = manager.list_captures()
assert len(captures) == 2
assert all(c.fingerprint_group.startswith('SIG-') for c in captures)
assert all(c.fingerprint_group_size == 2 for c in captures)
class TestSweep:
@@ -452,6 +472,7 @@ class TestSweep:
assert result['status'] == 'error'
def test_start_sweep_success(self, manager):
import time as _time
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_proc.stdout = MagicMock()
@@ -460,6 +481,8 @@ class TestSweep:
patch('subprocess.Popen', return_value=mock_proc), \
patch('utils.subghz.register_process'):
manager._sweep_available = None
manager._hackrf_device_cache = True
manager._hackrf_device_cache_ts = _time.time()
result = manager.start_sweep(freq_start_mhz=300, freq_end_mhz=928)
assert result['status'] == 'started'
@@ -517,8 +540,11 @@ class TestDecode:
with patch('shutil.which', return_value='/usr/bin/tool'), \
patch('subprocess.Popen', side_effect=popen_side_effect) as mock_popen, \
patch('utils.subghz.register_process'):
import time as _time
manager._hackrf_available = None
manager._rtl433_available = None
manager._hackrf_device_cache = True
manager._hackrf_device_cache_ts = _time.time()
result = manager.start_decode(
frequency_hz=433920000,
sample_rate=2000000,
@@ -536,10 +562,10 @@ class TestDecode:
assert '-r' in hackrf_cmd
# Verify rtl_433 command
rtl433_cmd = mock_popen.call_args_list[1][0][0]
assert rtl433_cmd[0] == 'rtl_433'
assert '-r' in rtl433_cmd
assert 'cs8:-' in rtl433_cmd
rtl433_cmd = mock_popen.call_args_list[1][0][0]
assert rtl433_cmd[0] == 'rtl_433'
assert '-r' in rtl433_cmd
assert 'cs8:-' in rtl433_cmd
# Both processes tracked
assert manager._decode_hackrf_process is mock_hackrf_proc
+2 -1
View File
@@ -138,7 +138,8 @@ class TestWeatherSatDecoder:
@patch('pty.openpty')
def test_start_already_running(self, mock_pty, mock_popen):
"""start() should return True when already running."""
with patch('shutil.which', return_value='/usr/bin/satdump'):
with patch('shutil.which', return_value='/usr/bin/satdump'), \
patch('utils.weather_sat.WeatherSatDecoder._resolve_device_id', return_value='0'):
decoder = WeatherSatDecoder()
decoder._running = True
+59 -40
View File
@@ -376,63 +376,82 @@ class MeshtasticClient:
self._error = "Meshtastic SDK not installed. Install with: pip install meshtastic"
return False
# Quick check under lock — bail if already running
with self._lock:
if self._running:
return True
try:
# Subscribe to message events before connecting
pub.subscribe(self._on_receive, "meshtastic.receive")
pub.subscribe(self._on_connection, "meshtastic.connection.established")
pub.subscribe(self._on_disconnect, "meshtastic.connection.lost")
# Create interface outside lock (blocking I/O: serial/TCP connect)
new_interface = None
new_device_path = None
new_connection_type = None
try:
# Subscribe to message events before connecting
pub.subscribe(self._on_receive, "meshtastic.receive")
pub.subscribe(self._on_connection, "meshtastic.connection.established")
pub.subscribe(self._on_disconnect, "meshtastic.connection.lost")
# Connect based on connection type
if connection_type == 'tcp':
if not hostname:
self._error = "Hostname is required for TCP connections"
self._cleanup_subscriptions()
return False
self._interface = meshtastic.tcp_interface.TCPInterface(hostname=hostname)
self._device_path = hostname
self._connection_type = 'tcp'
logger.info(f"Connected to Meshtastic device via TCP: {hostname}")
if connection_type == 'tcp':
if not hostname:
self._error = "Hostname is required for TCP connections"
self._cleanup_subscriptions()
return False
new_interface = meshtastic.tcp_interface.TCPInterface(hostname=hostname)
new_device_path = hostname
new_connection_type = 'tcp'
logger.info(f"Connected to Meshtastic device via TCP: {hostname}")
else:
if device:
new_interface = meshtastic.serial_interface.SerialInterface(device)
new_device_path = device
else:
# Serial connection (default)
if device:
self._interface = meshtastic.serial_interface.SerialInterface(device)
self._device_path = device
else:
# Auto-discover
self._interface = meshtastic.serial_interface.SerialInterface()
self._device_path = "auto"
self._connection_type = 'serial'
logger.info(f"Connected to Meshtastic device via serial: {self._device_path}")
new_interface = meshtastic.serial_interface.SerialInterface()
new_device_path = "auto"
new_connection_type = 'serial'
logger.info(f"Connected to Meshtastic device via serial: {new_device_path}")
except Exception as e:
self._error = str(e)
logger.error(f"Failed to connect to Meshtastic: {e}")
self._cleanup_subscriptions()
return False
self._running = True
self._error = None
# Install interface under lock
with self._lock:
if self._running:
# Another thread connected while we were connecting — discard ours
if new_interface:
try:
new_interface.close()
except Exception:
pass
return True
except Exception as e:
self._error = str(e)
logger.error(f"Failed to connect to Meshtastic: {e}")
self._cleanup_subscriptions()
return False
self._interface = new_interface
self._device_path = new_device_path
self._connection_type = new_connection_type
self._running = True
self._error = None
return True
def disconnect(self) -> None:
"""Disconnect from the Meshtastic device."""
iface_to_close = None
with self._lock:
if self._interface:
try:
self._interface.close()
except Exception as e:
logger.warning(f"Error closing Meshtastic interface: {e}")
self._interface = None
iface_to_close = self._interface
self._interface = None
self._cleanup_subscriptions()
self._running = False
self._device_path = None
self._connection_type = None
logger.info("Disconnected from Meshtastic device")
# Close interface outside lock (blocking I/O)
if iface_to_close:
try:
iface_to_close.close()
except Exception as e:
logger.warning(f"Error closing Meshtastic interface: {e}")
logger.info("Disconnected from Meshtastic device")
def _cleanup_subscriptions(self) -> None:
"""Unsubscribe from pubsub topics."""
+16 -7
View File
@@ -112,6 +112,8 @@ class ProcessMonitor:
def _check_all_processes(self) -> None:
"""Check health of all registered processes."""
# Collect crashed processes under lock, handle restarts outside
crashed: list[tuple[str, ProcessInfo]] = []
with self._lock:
for name, info in list(self.processes.items()):
if not info.enabled:
@@ -126,10 +128,14 @@ class ProcessMonitor:
logger.warning(
f"Process '{name}' terminated with code {return_code}"
)
self._handle_crash(name, info)
crashed.append((name, info))
# Handle restarts outside lock (involves sleeps and callbacks)
for name, info in crashed:
self._handle_crash(name, info)
def _handle_crash(self, name: str, info: ProcessInfo) -> None:
"""Handle a crashed process."""
"""Handle a crashed process. Must be called WITHOUT holding self._lock."""
if info.restart_callback is None:
logger.info(f"No restart callback for '{name}', skipping auto-restart")
return
@@ -139,7 +145,8 @@ class ProcessMonitor:
f"Process '{name}' exceeded max restarts ({info.max_restarts}), "
"disabling auto-restart"
)
info.enabled = False
with self._lock:
info.enabled = False
return
# Calculate backoff with exponential increase
@@ -149,18 +156,20 @@ class ProcessMonitor:
f"(attempt {info.restart_count + 1}/{info.max_restarts})"
)
# Wait for backoff period
# Wait for backoff period outside lock
time.sleep(backoff)
# Attempt restart
try:
info.restart_callback()
info.restart_count += 1
info.last_restart = datetime.now()
with self._lock:
info.restart_count += 1
info.last_restart = datetime.now()
logger.info(f"Successfully restarted '{name}'")
except Exception as e:
logger.error(f"Failed to restart '{name}': {e}")
info.restart_count += 1
with self._lock:
info.restart_count += 1
def get_status(self) -> Dict[str, Any]:
"""
+58 -36
View File
@@ -552,15 +552,20 @@ class SSTVDecoder:
# Clean up if the thread exits while we thought we were running.
# This prevents a "ghost running" state where is_running is True
# but the thread has already died (e.g. rtl_fm exited).
orphan_proc = None
with self._lock:
was_running = self._running
self._running = False
if was_running and self._rtl_process:
with contextlib.suppress(Exception):
self._rtl_process.terminate()
self._rtl_process.wait(timeout=2)
orphan_proc = self._rtl_process
self._rtl_process = None
# Terminate outside lock to avoid blocking other operations
if orphan_proc:
with contextlib.suppress(Exception):
orphan_proc.terminate()
orphan_proc.wait(timeout=2)
if was_running:
logger.warning("Audio decode thread stopped unexpectedly")
err_detail = rtl_fm_error.split('\n')[-1] if rtl_fm_error else ''
@@ -661,38 +666,52 @@ class SSTVDecoder:
def _retune_rtl_fm(self, new_freq_hz: int) -> None:
"""Retune rtl_fm to a new frequency by restarting the process."""
old_proc = None
with self._lock:
if not self._running:
return
old_proc = self._rtl_process
self._rtl_process = None
if self._rtl_process:
try:
self._rtl_process.terminate()
self._rtl_process.wait(timeout=2)
except Exception:
with contextlib.suppress(Exception):
self._rtl_process.kill()
# Terminate old process outside lock
if old_proc:
try:
old_proc.terminate()
old_proc.wait(timeout=2)
except Exception:
with contextlib.suppress(Exception):
old_proc.kill()
rtl_cmd = [
'rtl_fm',
'-d', str(self._device_index),
'-f', str(new_freq_hz),
'-M', self._modulation,
'-s', str(SAMPLE_RATE),
'-r', str(SAMPLE_RATE),
'-l', '0',
'-'
]
# Build and start new process outside lock
rtl_cmd = [
'rtl_fm',
'-d', str(self._device_index),
'-f', str(new_freq_hz),
'-M', self._modulation,
'-s', str(SAMPLE_RATE),
'-r', str(SAMPLE_RATE),
'-l', '0',
'-'
]
logger.debug(f"Restarting rtl_fm: {' '.join(rtl_cmd)}")
logger.debug(f"Restarting rtl_fm: {' '.join(rtl_cmd)}")
self._rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
new_proc = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self._current_tuned_freq_hz = new_freq_hz
# Re-acquire lock to install new process
with self._lock:
if self._running:
self._rtl_process = new_proc
self._current_tuned_freq_hz = new_freq_hz
else:
# stop() was called during retune — clean up new process
with contextlib.suppress(Exception):
new_proc.terminate()
new_proc.wait(timeout=2)
@property
def last_doppler_info(self) -> DopplerInfo | None:
@@ -706,19 +725,22 @@ class SSTVDecoder:
def stop(self) -> None:
"""Stop SSTV decoder."""
proc_to_terminate = None
with self._lock:
self._running = False
proc_to_terminate = self._rtl_process
self._rtl_process = None
if self._rtl_process:
try:
self._rtl_process.terminate()
self._rtl_process.wait(timeout=5)
except Exception:
with contextlib.suppress(Exception):
self._rtl_process.kill()
self._rtl_process = None
# Terminate outside lock to avoid blocking other operations
if proc_to_terminate:
try:
proc_to_terminate.terminate()
proc_to_terminate.wait(timeout=5)
except Exception:
with contextlib.suppress(Exception):
proc_to_terminate.kill()
logger.info("SSTV decoder stopped")
logger.info("SSTV decoder stopped")
def get_images(self) -> list[SSTVImage]:
"""Get list of decoded images."""
+2225 -2191
View File
File diff suppressed because it is too large Load Diff
+105 -94
View File
@@ -173,7 +173,7 @@ class WeatherSatDecoder:
self._current_frequency: float = 0.0
self._current_mode: str = ''
self._capture_start_time: float = 0
self._device_index: int = -1
self._device_index: int = -1
self._capture_output_dir: Path | None = None
self._on_complete_callback: Callable[[], None] | None = None
self._capture_phase: str = 'idle'
@@ -303,13 +303,13 @@ class WeatherSatDecoder:
))
return False
self._current_satellite = satellite
self._current_frequency = sat_info['frequency']
self._current_mode = sat_info['mode']
self._device_index = -1 # Offline decode does not claim an SDR device
self._capture_start_time = time.time()
self._capture_phase = 'decoding'
self._stop_event.clear()
self._current_satellite = satellite
self._current_frequency = sat_info['frequency']
self._current_mode = sat_info['mode']
self._device_index = -1 # Offline decode does not claim an SDR device
self._capture_start_time = time.time()
self._capture_phase = 'decoding'
self._stop_event.clear()
try:
self._running = True
@@ -363,6 +363,20 @@ class WeatherSatDecoder:
Returns:
True if started successfully
"""
# Validate satellite BEFORE acquiring the lock
sat_info = WEATHER_SATELLITES.get(satellite)
if not sat_info:
logger.error(f"Unknown satellite: {satellite}")
self._emit_progress(CaptureProgress(
status='error',
message=f'Unknown satellite: {satellite}'
))
return False
# Resolve device ID BEFORE lock — this runs rtl_test which can
# take up to 5s and has no side effects on instance state.
source_id = self._resolve_device_id(device_index)
with self._lock:
if self._running:
return True
@@ -375,15 +389,6 @@ class WeatherSatDecoder:
))
return False
sat_info = WEATHER_SATELLITES.get(satellite)
if not sat_info:
logger.error(f"Unknown satellite: {satellite}")
self._emit_progress(CaptureProgress(
status='error',
message=f'Unknown satellite: {satellite}'
))
return False
self._current_satellite = satellite
self._current_frequency = sat_info['frequency']
self._current_mode = sat_info['mode']
@@ -394,7 +399,7 @@ class WeatherSatDecoder:
try:
self._running = True
self._start_satdump(sat_info, device_index, gain, sample_rate, bias_t)
self._start_satdump(sat_info, device_index, gain, sample_rate, bias_t, source_id)
logger.info(
f"Weather satellite capture started: {satellite} "
@@ -429,6 +434,7 @@ class WeatherSatDecoder:
gain: float,
sample_rate: int,
bias_t: bool,
source_id: str | None = None,
) -> None:
"""Start SatDump live capture and decode."""
# Create timestamped output directory for this capture
@@ -439,9 +445,9 @@ class WeatherSatDecoder:
freq_hz = int(sat_info['frequency'] * 1_000_000)
# SatDump v1.2+ uses string source_id (device serial) not numeric index.
# Auto-detect serial by querying rtl_eeprom, fall back to string index.
source_id = self._resolve_device_id(device_index)
# Use pre-resolved source_id, or fall back to resolving now
if source_id is None:
source_id = self._resolve_device_id(device_index)
cmd = [
'satdump', 'live',
@@ -465,18 +471,18 @@ class WeatherSatDecoder:
master_fd, slave_fd = pty.openpty()
self._pty_master_fd = master_fd
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
try:
os.close(slave_fd) # parent doesn't need the slave side
except OSError:
pass
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
try:
os.close(slave_fd) # parent doesn't need the slave side
except OSError:
pass
# Check for early exit asynchronously (avoid blocking /start for 3s)
def _check_early_exit():
@@ -568,18 +574,18 @@ class WeatherSatDecoder:
master_fd, slave_fd = pty.openpty()
self._pty_master_fd = master_fd
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
try:
os.close(slave_fd) # parent doesn't need the slave side
except OSError:
pass
self._process = subprocess.Popen(
cmd,
stdout=slave_fd,
stderr=slave_fd,
stdin=subprocess.DEVNULL,
close_fds=True,
)
register_process(self._process)
try:
os.close(slave_fd) # parent doesn't need the slave side
except OSError:
pass
# For offline mode, don't check for early exit — file decoding
# may complete very quickly and exit code 0 is normal success.
@@ -812,20 +818,23 @@ class WeatherSatDecoder:
# Signal watcher thread to do final scan and exit
self._stop_event.set()
# Process ended — release resources
was_running = self._running
self._running = False
# Acquire lock when modifying shared state to avoid racing
# with stop() which may have already cleaned up.
with self._lock:
was_running = self._running
self._running = False
process = self._process
elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0
if was_running:
# Collect exit status (returncode is only set after poll/wait)
if self._process and self._process.returncode is None:
if process and process.returncode is None:
try:
self._process.wait(timeout=5)
process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait()
retcode = self._process.returncode if self._process else None
process.kill()
process.wait()
retcode = process.returncode if process else None
if retcode and retcode != 0:
self._capture_phase = 'error'
self._emit_progress(CaptureProgress(
@@ -899,24 +908,24 @@ class WeatherSatDecoder:
except OSError:
continue
# Determine product type from filename/path
product = self._parse_product_name(filepath)
# Copy image to main output dir for serving
safe_sat = re.sub(r'[^A-Za-z0-9_-]+', '_', self._current_satellite).strip('_') or 'satellite'
safe_stem = re.sub(r'[^A-Za-z0-9_-]+', '_', filepath.stem).strip('_') or 'image'
suffix = filepath.suffix.lower()
if suffix not in ('.png', '.jpg', '.jpeg'):
suffix = '.png'
serve_name = (
f"{safe_sat}_{safe_stem}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
f"{suffix}"
)
serve_path = self._output_dir / serve_name
try:
shutil.copy2(filepath, serve_path)
except OSError:
# Copy failed — don't mark as known so it can be retried
# Determine product type from filename/path
product = self._parse_product_name(filepath)
# Copy image to main output dir for serving
safe_sat = re.sub(r'[^A-Za-z0-9_-]+', '_', self._current_satellite).strip('_') or 'satellite'
safe_stem = re.sub(r'[^A-Za-z0-9_-]+', '_', filepath.stem).strip('_') or 'image'
suffix = filepath.suffix.lower()
if suffix not in ('.png', '.jpg', '.jpeg'):
suffix = '.png'
serve_name = (
f"{safe_sat}_{safe_stem}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
f"{suffix}"
)
serve_path = self._output_dir / serve_name
try:
shutil.copy2(filepath, serve_path)
except OSError:
# Copy failed — don't mark as known so it can be retried
continue
# Only mark as known after successful copy
@@ -960,12 +969,12 @@ class WeatherSatDecoder:
return 'Multispectral Analysis'
if 'thermal' in name or 'temp' in name:
return 'Thermal'
if 'ndvi' in name:
return 'NDVI Vegetation'
if 'channel' in name or 'ch' in name:
match = re.search(r'(?:channel|ch)[\s_-]*(\d+)', name)
if match:
return f'Channel {match.group(1)}'
if 'ndvi' in name:
return 'NDVI Vegetation'
if 'channel' in name or 'ch' in name:
match = re.search(r'(?:channel|ch)[\s_-]*(\d+)', name)
if match:
return f'Channel {match.group(1)}'
if 'avhrr' in name:
return 'AVHRR'
if 'msu' in name or 'mtvza' in name:
@@ -986,14 +995,16 @@ class WeatherSatDecoder:
self._running = False
self._stop_event.set()
self._close_pty()
process = self._process
self._process = None
elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0
logger.info(f"Weather satellite capture stopped after {elapsed}s")
self._device_index = -1
if self._process:
safe_terminate(self._process)
self._process = None
elapsed = int(time.time() - self._capture_start_time) if self._capture_start_time else 0
logger.info(f"Weather satellite capture stopped after {elapsed}s")
self._device_index = -1
# Terminate outside the lock so stop() returns quickly
# and doesn't block start() or other lock acquisitions
if process:
safe_terminate(process)
def get_images(self) -> list[WeatherSatImage]:
"""Get list of decoded images."""
@@ -1029,18 +1040,18 @@ class WeatherSatDecoder:
sat_info = WEATHER_SATELLITES.get(satellite, {})
image = WeatherSatImage(
filename=filepath.name,
path=filepath,
satellite=satellite,
mode=sat_info.get('mode', 'Unknown'),
image = WeatherSatImage(
filename=filepath.name,
path=filepath,
satellite=satellite,
mode=sat_info.get('mode', 'Unknown'),
timestamp=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
frequency=sat_info.get('frequency', 0.0),
size_bytes=stat.st_size,
product=self._parse_product_name(filepath),
)
self._images.append(image)
known_filenames.add(filepath.name)
size_bytes=stat.st_size,
product=self._parse_product_name(filepath),
)
self._images.append(image)
known_filenames.add(filepath.name)
def delete_image(self, filename: str) -> bool:
"""Delete a decoded image."""
+41 -21
View File
@@ -299,14 +299,7 @@ class WeFaxDecoder:
try:
self._running = True
self._last_error = ''
self._start_pipeline()
logger.info(
f"WeFax decoder started: {frequency_khz} kHz, "
f"station={station}, IOC={ioc}, LPM={lpm}"
)
return True
self._start_pipeline_spawn()
except Exception as e:
self._running = False
self._last_error = str(e)
@@ -317,8 +310,32 @@ class WeFaxDecoder:
))
return False
# Health check sleep outside lock
try:
self._start_pipeline_health_check()
logger.info(
f"WeFax decoder started: {frequency_khz} kHz, "
f"station={station}, IOC={ioc}, LPM={lpm}"
)
return True
except Exception as e:
with self._lock:
self._running = False
self._last_error = str(e)
logger.error(f"Failed to start WeFax decoder: {e}")
self._emit_progress(WeFaxProgress(
status='error',
message=str(e),
))
return False
def _start_pipeline(self) -> None:
"""Start SDR FM demod subprocess in USB mode for WeFax."""
self._start_pipeline_spawn()
self._start_pipeline_health_check()
def _start_pipeline_spawn(self) -> None:
"""Spawn the SDR FM demod subprocess. Must hold self._lock."""
try:
sdr_type_enum = SDRType(self._sdr_type)
except ValueError:
@@ -361,21 +378,24 @@ class WeFaxDecoder:
stderr=subprocess.PIPE,
)
# Post-spawn health check — catch immediate failures
def _start_pipeline_health_check(self) -> None:
"""Post-spawn health check and decode thread start. Called outside lock."""
time.sleep(0.3)
if self._sdr_process.poll() is not None:
stderr_detail = ''
if self._sdr_process.stderr:
stderr_detail = self._sdr_process.stderr.read().decode(
errors='replace').strip()
rc = self._sdr_process.returncode
self._sdr_process = None
detail = stderr_detail.split('\n')[-1] if stderr_detail else f'exit code {rc}'
raise RuntimeError(f'{self._sdr_tool_name} failed: {detail}')
self._decode_thread = threading.Thread(
target=self._decode_audio_stream, daemon=True)
self._decode_thread.start()
with self._lock:
if self._sdr_process and self._sdr_process.poll() is not None:
stderr_detail = ''
if self._sdr_process.stderr:
stderr_detail = self._sdr_process.stderr.read().decode(
errors='replace').strip()
rc = self._sdr_process.returncode
self._sdr_process = None
detail = stderr_detail.split('\n')[-1] if stderr_detail else f'exit code {rc}'
raise RuntimeError(f'{self._sdr_tool_name} failed: {detail}')
self._decode_thread = threading.Thread(
target=self._decode_audio_stream, daemon=True)
self._decode_thread.start()
def _decode_audio_stream(self) -> None:
"""Read audio from SDR FM demod and decode WeFax images.