wefax: auto-align carrier frequencies for usb tuning

This commit is contained in:
Smittix
2026-02-24 23:20:09 +00:00
parent 4bf452d462
commit 5edfe1797c
5 changed files with 426 additions and 147 deletions

View File

@@ -15,7 +15,13 @@ from utils.logging import get_logger
from utils.sse import sse_stream_fanout
from utils.validation import validate_frequency
from utils.wefax import get_wefax_decoder
from utils.wefax_stations import get_current_broadcasts, get_station, load_stations
from utils.wefax_stations import (
WEFAX_USB_ALIGNMENT_OFFSET_KHZ,
get_current_broadcasts,
get_station,
load_stations,
resolve_tuning_frequency_khz,
)
logger = get_logger('intercept.wefax')
@@ -76,7 +82,8 @@ def start_decoder():
"gain": 40,
"ioc": 576,
"lpm": 120,
"direct_sampling": true
"direct_sampling": true,
"frequency_reference": "auto" // auto, carrier, or dial
}
"""
decoder = get_wefax_decoder()
@@ -121,6 +128,25 @@ def start_decoder():
ioc = int(data.get('ioc', 576))
lpm = int(data.get('lpm', 120))
direct_sampling = bool(data.get('direct_sampling', True))
frequency_reference = str(data.get('frequency_reference', 'auto')).strip().lower()
if not frequency_reference:
frequency_reference = 'auto'
try:
tuned_frequency_khz, resolved_reference, usb_offset_applied = (
resolve_tuning_frequency_khz(
listed_frequency_khz=frequency_khz,
station_callsign=station,
frequency_reference=frequency_reference,
)
)
tuned_mhz = tuned_frequency_khz / 1000.0
validate_frequency(tuned_mhz, min_mhz=2.0, max_mhz=30.0)
except ValueError as e:
return jsonify({
'status': 'error',
'message': f'Invalid frequency settings: {e}',
}), 400
# Validate IOC and LPM
if ioc not in (288, 576):
@@ -149,7 +175,7 @@ def start_decoder():
# Set callback and start
decoder.set_callback(_progress_callback)
success = decoder.start(
frequency_khz=frequency_khz,
frequency_khz=tuned_frequency_khz,
station=station,
device_index=device_int,
gain=gain,
@@ -163,6 +189,12 @@ def start_decoder():
return jsonify({
'status': 'started',
'frequency_khz': frequency_khz,
'tuned_frequency_khz': tuned_frequency_khz,
'frequency_reference': resolved_reference,
'usb_offset_applied': usb_offset_applied,
'usb_offset_khz': (
WEFAX_USB_ALIGNMENT_OFFSET_KHZ if usb_offset_applied else 0.0
),
'station': station,
'ioc': ioc,
'lpm': lpm,
@@ -297,7 +329,8 @@ def enable_schedule():
"gain": 40,
"ioc": 576,
"lpm": 120,
"direct_sampling": true
"direct_sampling": true,
"frequency_reference": "auto" // auto, carrier, or dial
}
Returns:
@@ -336,6 +369,25 @@ def enable_schedule():
ioc = int(data.get('ioc', 576))
lpm = int(data.get('lpm', 120))
direct_sampling = bool(data.get('direct_sampling', True))
frequency_reference = str(data.get('frequency_reference', 'auto')).strip().lower()
if not frequency_reference:
frequency_reference = 'auto'
try:
tuned_frequency_khz, resolved_reference, usb_offset_applied = (
resolve_tuning_frequency_khz(
listed_frequency_khz=frequency_khz,
station_callsign=station,
frequency_reference=frequency_reference,
)
)
tuned_mhz = tuned_frequency_khz / 1000.0
validate_frequency(tuned_mhz, min_mhz=2.0, max_mhz=30.0)
except ValueError as e:
return jsonify({
'status': 'error',
'message': f'Invalid frequency settings: {e}',
}), 400
scheduler = get_wefax_scheduler()
scheduler.set_callbacks(_progress_callback, _scheduler_event_callback)
@@ -343,7 +395,7 @@ def enable_schedule():
try:
result = scheduler.enable(
station=station,
frequency_khz=frequency_khz,
frequency_khz=tuned_frequency_khz,
device=device,
gain=gain,
ioc=ioc,
@@ -357,7 +409,17 @@ def enable_schedule():
'message': 'Failed to enable scheduler',
}), 500
return jsonify({'status': 'ok', **result})
return jsonify({
'status': 'ok',
**result,
'frequency_khz': frequency_khz,
'tuned_frequency_khz': tuned_frequency_khz,
'frequency_reference': resolved_reference,
'usb_offset_applied': usb_offset_applied,
'usb_offset_khz': (
WEFAX_USB_ALIGNMENT_OFFSET_KHZ if usb_offset_applied else 0.0
),
})
@wefax_bp.route('/schedule/disable', methods=['POST'])

View File

@@ -152,6 +152,14 @@ var WeFax = (function () {
// ---- Start / Stop ----
function selectedFrequencyReference() {
var alignCheckbox = document.getElementById('wefaxAutoUsbAlign');
if (alignCheckbox && !alignCheckbox.checked) {
return 'dial';
}
return 'auto';
}
function start() {
if (state.running) return;
@@ -180,6 +188,7 @@ var WeFax = (function () {
ioc: iocSel ? parseInt(iocSel.value, 10) : 576,
lpm: lpmSel ? parseInt(lpmSel.value, 10) : 120,
direct_sampling: dsCheckbox ? dsCheckbox.checked : true,
frequency_reference: selectedFrequencyReference(),
};
fetch('/wefax/start', {
@@ -190,10 +199,16 @@ var WeFax = (function () {
.then(function (r) { return r.json(); })
.then(function (data) {
if (data.status === 'started' || data.status === 'already_running') {
var tunedKhz = Number(data.tuned_frequency_khz);
if (isNaN(tunedKhz) || tunedKhz <= 0) tunedKhz = freqKhz;
state.running = true;
updateButtons(true);
setStatus('Scanning ' + freqKhz + ' kHz...');
setStripFreq(freqKhz);
if (data.usb_offset_applied) {
setStatus('Scanning ' + tunedKhz + ' kHz (USB aligned from ' + freqKhz + ' kHz)...');
} else {
setStatus('Scanning ' + tunedKhz + ' kHz...');
}
setStripFreq(tunedKhz);
connectSSE();
} else {
var errMsg = data.message || 'unknown error';
@@ -1051,12 +1066,17 @@ var WeFax = (function () {
ioc: iocSel ? parseInt(iocSel.value, 10) : 576,
lpm: lpmSel ? parseInt(lpmSel.value, 10) : 120,
direct_sampling: dsCheckbox ? dsCheckbox.checked : true,
frequency_reference: selectedFrequencyReference(),
}),
})
.then(function (r) { return r.json(); })
.then(function (data) {
if (data.status === 'ok') {
setStatus('Auto-capture enabled — ' + (data.scheduled_count || 0) + ' broadcasts scheduled');
var status = 'Auto-capture enabled — ' + (data.scheduled_count || 0) + ' broadcasts scheduled';
if (data.usb_offset_applied && !isNaN(Number(data.tuned_frequency_khz))) {
status += ' (tuning ' + Number(data.tuned_frequency_khz) + ' kHz)';
}
setStatus(status);
syncSchedulerCheckboxes(true);
state.schedulerEnabled = true;
connectSSE();

View File

@@ -48,6 +48,13 @@
<input type="checkbox" id="wefaxDirectSampling" checked>
<label for="wefaxDirectSampling" style="margin: 0; cursor: pointer;">Direct Sampling (Q-branch, required for HF)</label>
</div>
<div class="form-group" style="display: flex; align-items: center; gap: 8px;">
<input type="checkbox" id="wefaxAutoUsbAlign" checked>
<label for="wefaxAutoUsbAlign" style="margin: 0; cursor: pointer;">Auto USB align listed carrier frequencies (-1.9 kHz)</label>
</div>
<p class="info-text" style="font-size: 11px; color: var(--text-dim); margin-top: -4px;">
Disable this if your source already provides USB dial frequencies.
</p>
</div>
<div class="section">

View File

@@ -59,6 +59,62 @@ class TestWeFaxStations:
from utils.wefax_stations import get_station
assert get_station('XXXXX') is None
def test_resolve_tuning_frequency_auto_uses_carrier_for_known_station(self):
"""Known station frequencies default to carrier-list behavior in auto mode."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='auto',
)
assert math.isclose(tuned, 4296.1, abs_tol=1e-6)
assert reference == 'carrier'
assert offset_applied is True
def test_resolve_tuning_frequency_auto_preserves_unknown_station_input(self):
"""Ad-hoc frequencies (no station metadata) should be treated as dial."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='',
frequency_reference='auto',
)
assert math.isclose(tuned, 4298.0, abs_tol=1e-6)
assert reference == 'dial'
assert offset_applied is False
def test_resolve_tuning_frequency_dial_override(self):
"""Explicit dial reference must bypass USB alignment."""
from utils.wefax_stations import resolve_tuning_frequency_khz
tuned, reference, offset_applied = resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='dial',
)
assert math.isclose(tuned, 4298.0, abs_tol=1e-6)
assert reference == 'dial'
assert offset_applied is False
def test_resolve_tuning_frequency_rejects_invalid_reference(self):
"""Invalid frequency reference values should raise a validation error."""
from utils.wefax_stations import resolve_tuning_frequency_khz
try:
resolve_tuning_frequency_khz(
listed_frequency_khz=4298.0,
station_callsign='NOJ',
frequency_reference='invalid',
)
assert False, "Expected ValueError for invalid frequency_reference"
except ValueError as exc:
assert 'frequency_reference' in str(exc)
def test_station_frequencies_have_khz(self):
"""Each frequency entry must have 'khz' and 'description'."""
from utils.wefax_stations import load_stations
@@ -359,8 +415,42 @@ class TestWeFaxRoutes:
data = response.get_json()
assert data['status'] == 'started'
assert data['frequency_khz'] == 4298
assert data['usb_offset_applied'] is True
assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6)
assert data['frequency_reference'] == 'carrier'
assert data['station'] == 'NOJ'
mock_decoder.start.assert_called_once()
start_kwargs = mock_decoder.start.call_args.kwargs
assert math.isclose(start_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6)
def test_start_respects_dial_reference_override(self, client):
"""POST /wefax/start with dial reference should not apply USB offset."""
_login_session(client)
mock_decoder = MagicMock()
mock_decoder.is_running = False
mock_decoder.start.return_value = True
with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \
patch('routes.wefax.app_module.claim_sdr_device', return_value=None):
response = client.post(
'/wefax/start',
data=json.dumps({
'frequency_khz': 4298,
'station': 'NOJ',
'device': 0,
'frequency_reference': 'dial',
}),
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'started'
assert data['usb_offset_applied'] is False
assert math.isclose(data['tuned_frequency_khz'], 4298.0, abs_tol=1e-6)
assert data['frequency_reference'] == 'dial'
start_kwargs = mock_decoder.start.call_args.kwargs
assert math.isclose(start_kwargs['frequency_khz'], 4298.0, abs_tol=1e-6)
def test_start_device_busy(self, client):
"""POST /wefax/start should return 409 when device is busy."""
@@ -429,6 +519,35 @@ class TestWeFaxRoutes:
assert response.status_code == 400
def test_schedule_enable_applies_usb_alignment(self, client):
"""Scheduler should receive tuned USB dial frequency in auto mode."""
_login_session(client)
mock_scheduler = MagicMock()
mock_scheduler.enable.return_value = {
'enabled': True,
'scheduled_count': 2,
'total_broadcasts': 2,
}
with patch('utils.wefax_scheduler.get_wefax_scheduler', return_value=mock_scheduler):
response = client.post(
'/wefax/schedule/enable',
data=json.dumps({
'station': 'NOJ',
'frequency_khz': 4298,
'device': 0,
}),
content_type='application/json',
)
assert response.status_code == 200
data = response.get_json()
assert data['status'] == 'ok'
assert data['usb_offset_applied'] is True
assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6)
enable_kwargs = mock_scheduler.enable.call_args.kwargs
assert math.isclose(enable_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6)
class TestWeFaxProgressCallback:
"""Regression tests for WeFax route-level progress callback behavior."""

View File

@@ -12,6 +12,8 @@ from pathlib import Path
_stations_cache: list[dict] | None = None
_stations_by_callsign: dict[str, dict] = {}
_VALID_FREQUENCY_REFERENCES = {'auto', 'carrier', 'dial'}
WEFAX_USB_ALIGNMENT_OFFSET_KHZ = 1.9
_STATIONS_PATH = Path(__file__).resolve().parent.parent / 'data' / 'wefax_stations.json'
@@ -37,6 +39,75 @@ def get_station(callsign: str) -> dict | None:
return _stations_by_callsign.get(callsign.upper())
def _normalize_frequency_reference(value: str | None) -> str:
"""Normalize and validate frequency reference token."""
reference = str(value or 'auto').strip().lower()
if reference not in _VALID_FREQUENCY_REFERENCES:
choices = ', '.join(sorted(_VALID_FREQUENCY_REFERENCES))
raise ValueError(f'frequency_reference must be one of: {choices}')
return reference
def _station_frequency_reference(station: dict, listed_frequency_khz: float) -> str:
"""Infer whether a station frequency entry is carrier or already USB dial."""
for entry in station.get('frequencies', []):
try:
entry_khz = float(entry.get('khz'))
except (TypeError, ValueError):
continue
if abs(entry_khz - listed_frequency_khz) > 0.001:
continue
entry_ref = str(entry.get('reference', '')).strip().lower()
if entry_ref in ('carrier', 'dial'):
return entry_ref
station_ref = str(station.get('frequency_reference', '')).strip().lower()
if station_ref in ('carrier', 'dial'):
return station_ref
# Most published marine WeFax channel lists are carrier frequencies.
return 'carrier'
def resolve_tuning_frequency_khz(
listed_frequency_khz: float,
station_callsign: str = '',
frequency_reference: str = 'auto',
) -> tuple[float, str, bool]:
"""Resolve listed frequency to the actual USB dial frequency.
Args:
listed_frequency_khz: Frequency value provided by UI/API.
station_callsign: Station callsign used for metadata lookup.
frequency_reference: One of auto/carrier/dial.
Returns:
(tuned_frequency_khz, resolved_reference, offset_applied)
"""
listed = float(listed_frequency_khz)
if listed <= 0:
raise ValueError('frequency_khz must be greater than zero')
requested_ref = _normalize_frequency_reference(frequency_reference)
resolved_ref = requested_ref
if requested_ref == 'auto':
station = get_station(station_callsign) if station_callsign else None
if station:
resolved_ref = _station_frequency_reference(station, listed)
else:
# For ad-hoc frequencies (no station metadata), treat input as dial.
resolved_ref = 'dial'
if resolved_ref == 'carrier':
tuned = round(listed - WEFAX_USB_ALIGNMENT_OFFSET_KHZ, 3)
if tuned <= 0:
raise ValueError('frequency_khz too low after USB alignment offset')
return tuned, resolved_ref, True
return listed, resolved_ref, False
def get_current_broadcasts(callsign: str) -> list[dict]:
"""Return schedule entries closest to the current UTC time.