Remove legacy RF modes and add SignalID route/tests

This commit is contained in:
Smittix
2026-02-23 13:34:00 +00:00
parent 5f480caa3f
commit 7ea06caaa2
35 changed files with 3883 additions and 6920 deletions
+33 -33
View File
@@ -2,40 +2,40 @@
def register_blueprints(app):
"""Register all route blueprints with the Flask app."""
from .pager import pager_bp
from .sensor import sensor_bp
from .rtlamr import rtlamr_bp
from .wifi import wifi_bp
from .wifi_v2 import wifi_v2_bp
from .bluetooth import bluetooth_bp
from .bluetooth_v2 import bluetooth_v2_bp
from .acars import acars_bp
from .adsb import adsb_bp
from .ais import ais_bp
from .dsc import dsc_bp
from .acars import acars_bp
from .vdl2 import vdl2_bp
from .aprs import aprs_bp
from .satellite import satellite_bp
from .gps import gps_bp
from .settings import settings_bp
from .correlation import correlation_bp
from .listening_post import listening_post_bp
from .meshtastic import meshtastic_bp
from .tscm import tscm_bp, init_tscm_state
from .spy_stations import spy_stations_bp
from .controller import controller_bp
from .offline import offline_bp
from .updater import updater_bp
from .sstv import sstv_bp
from .weather_sat import weather_sat_bp
from .sstv_general import sstv_general_bp
from .websdr import websdr_bp
from .alerts import alerts_bp
from .recordings import recordings_bp
from .subghz import subghz_bp
from .aprs import aprs_bp
from .bluetooth import bluetooth_bp
from .bluetooth_v2 import bluetooth_v2_bp
from .bt_locate import bt_locate_bp
from .controller import controller_bp
from .correlation import correlation_bp
from .dsc import dsc_bp
from .gps import gps_bp
from .listening_post import receiver_bp
from .meshtastic import meshtastic_bp
from .offline import offline_bp
from .pager import pager_bp
from .recordings import recordings_bp
from .rtlamr import rtlamr_bp
from .satellite import satellite_bp
from .sensor import sensor_bp
from .settings import settings_bp
from .signalid import signalid_bp
from .space_weather import space_weather_bp
from .fingerprint import fingerprint_bp
from .spy_stations import spy_stations_bp
from .sstv import sstv_bp
from .sstv_general import sstv_general_bp
from .subghz import subghz_bp
from .tscm import init_tscm_state, tscm_bp
from .updater import updater_bp
from .vdl2 import vdl2_bp
from .weather_sat import weather_sat_bp
from .websdr import websdr_bp
from .wifi import wifi_bp
from .wifi_v2 import wifi_v2_bp
app.register_blueprint(pager_bp)
app.register_blueprint(sensor_bp)
@@ -54,7 +54,7 @@ def register_blueprints(app):
app.register_blueprint(gps_bp)
app.register_blueprint(settings_bp)
app.register_blueprint(correlation_bp)
app.register_blueprint(listening_post_bp)
app.register_blueprint(receiver_bp)
app.register_blueprint(meshtastic_bp)
app.register_blueprint(tscm_bp)
app.register_blueprint(spy_stations_bp)
@@ -68,9 +68,9 @@ def register_blueprints(app):
app.register_blueprint(alerts_bp) # Cross-mode alerts
app.register_blueprint(recordings_bp) # Session recordings
app.register_blueprint(subghz_bp) # SubGHz transceiver (HackRF)
app.register_blueprint(bt_locate_bp) # BT Locate SAR device tracking
app.register_blueprint(space_weather_bp) # Space weather monitoring
app.register_blueprint(fingerprint_bp) # RF fingerprinting
app.register_blueprint(bt_locate_bp) # BT Locate SAR device tracking
app.register_blueprint(space_weather_bp) # Space weather monitoring
app.register_blueprint(signalid_bp) # External signal ID enrichment
# Initialize TSCM state with queue and lock from app
import app as app_module
-113
View File
@@ -1,113 +0,0 @@
"""RF Fingerprinting CRUD + compare API."""
from __future__ import annotations
import os
import threading
from flask import Blueprint, jsonify, request
fingerprint_bp = Blueprint("fingerprint", __name__, url_prefix="/fingerprint")
_fingerprinter = None
_fingerprinter_lock = threading.Lock()
_active_session_id: int | None = None
_session_lock = threading.Lock()
def _get_fingerprinter():
global _fingerprinter
if _fingerprinter is None:
with _fingerprinter_lock:
if _fingerprinter is None:
from utils.rf_fingerprint import RFFingerprinter
db_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "instance", "rf_fingerprints.db"
)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
_fingerprinter = RFFingerprinter(db_path)
return _fingerprinter
@fingerprint_bp.route("/start", methods=["POST"])
def start_session():
global _active_session_id
data = request.get_json(force=True) or {}
name = data.get("name", "Unnamed Session")
location = data.get("location")
fp = _get_fingerprinter()
with _session_lock:
if _active_session_id is not None:
return jsonify({"error": "Session already active", "session_id": _active_session_id}), 409
session_id = fp.start_session(name, location)
_active_session_id = session_id
return jsonify({"session_id": session_id, "name": name})
@fingerprint_bp.route("/stop", methods=["POST"])
def stop_session():
global _active_session_id
fp = _get_fingerprinter()
with _session_lock:
if _active_session_id is None:
return jsonify({"error": "No active session"}), 400
session_id = _active_session_id
result = fp.finalize(session_id)
_active_session_id = None
return jsonify(result)
@fingerprint_bp.route("/observation", methods=["POST"])
def add_observation():
global _active_session_id
fp = _get_fingerprinter()
data = request.get_json(force=True) or {}
observations = data.get("observations", [])
with _session_lock:
session_id = _active_session_id
if session_id is None:
return jsonify({"error": "No active session"}), 400
if not observations:
return jsonify({"added": 0})
fp.add_observations_batch(session_id, observations)
return jsonify({"added": len(observations)})
@fingerprint_bp.route("/list", methods=["GET"])
def list_sessions():
fp = _get_fingerprinter()
sessions = fp.list_sessions()
with _session_lock:
active_id = _active_session_id
return jsonify({"sessions": sessions, "active_session_id": active_id})
@fingerprint_bp.route("/compare", methods=["POST"])
def compare():
fp = _get_fingerprinter()
data = request.get_json(force=True) or {}
baseline_id = data.get("baseline_id")
observations = data.get("observations", [])
if not baseline_id:
return jsonify({"error": "baseline_id required"}), 400
anomalies = fp.compare(int(baseline_id), observations)
bands = fp.get_baseline_bands(int(baseline_id))
return jsonify({"anomalies": anomalies, "baseline_bands": bands})
@fingerprint_bp.route("/<int:session_id>", methods=["DELETE"])
def delete_session(session_id: int):
global _active_session_id
fp = _get_fingerprinter()
with _session_lock:
if _active_session_id == session_id:
_active_session_id = None
fp.delete_session(session_id)
return jsonify({"deleted": session_id})
@fingerprint_bp.route("/status", methods=["GET"])
def session_status():
with _session_lock:
active_id = _active_session_id
return jsonify({"active_session_id": active_id})
+53 -53
View File
@@ -1,4 +1,4 @@
"""Listening Post routes for radio monitoring and frequency scanning."""
"""Receiver routes for radio monitoring and frequency scanning."""
from __future__ import annotations
@@ -29,9 +29,9 @@ from utils.constants import (
)
from utils.sdr import SDRFactory, SDRType
logger = get_logger('intercept.listening_post')
listening_post_bp = Blueprint('listening_post', __name__, url_prefix='/listening')
logger = get_logger('intercept.receiver')
receiver_bp = Blueprint('receiver', __name__, url_prefix='/receiver')
# ============================================
# GLOBAL STATE
@@ -53,7 +53,7 @@ scanner_lock = threading.Lock()
scanner_paused = False
scanner_current_freq = 0.0
scanner_active_device: Optional[int] = None
listening_active_device: Optional[int] = None
receiver_active_device: Optional[int] = None
scanner_power_process: Optional[subprocess.Popen] = None
scanner_config = {
'start_freq': 88.0,
@@ -941,7 +941,7 @@ def _stop_audio_stream_internal():
# API ENDPOINTS
# ============================================
@listening_post_bp.route('/tools')
@receiver_bp.route('/tools')
def check_tools() -> Response:
"""Check for required tools."""
rtl_fm = find_rtl_fm()
@@ -967,10 +967,10 @@ def check_tools() -> Response:
})
@listening_post_bp.route('/scanner/start', methods=['POST'])
@receiver_bp.route('/scanner/start', methods=['POST'])
def start_scanner() -> Response:
"""Start the frequency scanner."""
global scanner_thread, scanner_running, scanner_config, scanner_active_device, listening_active_device
global scanner_thread, scanner_running, scanner_config, scanner_active_device, receiver_active_device
with scanner_lock:
if scanner_running:
@@ -1036,9 +1036,9 @@ def start_scanner() -> Response:
'message': 'rtl_power not found. Install rtl-sdr tools.'
}), 503
# Release listening device if active
if listening_active_device is not None:
app_module.release_sdr_device(listening_active_device)
listening_active_device = None
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
receiver_active_device = None
# Claim device for scanner
error = app_module.claim_sdr_device(scanner_config['device'], 'scanner')
if error:
@@ -1064,9 +1064,9 @@ def start_scanner() -> Response:
'status': 'error',
'message': f'rx_fm not found. Install SoapySDR utilities for {sdr_type}.'
}), 503
if listening_active_device is not None:
app_module.release_sdr_device(listening_active_device)
listening_active_device = None
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
receiver_active_device = None
error = app_module.claim_sdr_device(scanner_config['device'], 'scanner')
if error:
return jsonify({
@@ -1086,7 +1086,7 @@ def start_scanner() -> Response:
})
@listening_post_bp.route('/scanner/stop', methods=['POST'])
@receiver_bp.route('/scanner/stop', methods=['POST'])
def stop_scanner() -> Response:
"""Stop the frequency scanner."""
global scanner_running, scanner_active_device, scanner_power_process
@@ -1110,7 +1110,7 @@ def stop_scanner() -> Response:
return jsonify({'status': 'stopped'})
@listening_post_bp.route('/scanner/pause', methods=['POST'])
@receiver_bp.route('/scanner/pause', methods=['POST'])
def pause_scanner() -> Response:
"""Pause/resume the scanner."""
global scanner_paused
@@ -1132,7 +1132,7 @@ def pause_scanner() -> Response:
scanner_skip_signal = False
@listening_post_bp.route('/scanner/skip', methods=['POST'])
@receiver_bp.route('/scanner/skip', methods=['POST'])
def skip_signal() -> Response:
"""Skip current signal and continue scanning."""
global scanner_skip_signal
@@ -1152,7 +1152,7 @@ def skip_signal() -> Response:
})
@listening_post_bp.route('/scanner/config', methods=['POST'])
@receiver_bp.route('/scanner/config', methods=['POST'])
def update_scanner_config() -> Response:
"""Update scanner config while running (step, squelch, gain, dwell)."""
data = request.json or {}
@@ -1194,7 +1194,7 @@ def update_scanner_config() -> Response:
})
@listening_post_bp.route('/scanner/status')
@receiver_bp.route('/scanner/status')
def scanner_status() -> Response:
"""Get scanner status."""
return jsonify({
@@ -1207,16 +1207,16 @@ def scanner_status() -> Response:
})
@listening_post_bp.route('/scanner/stream')
@receiver_bp.route('/scanner/stream')
def stream_scanner_events() -> Response:
"""SSE stream for scanner events."""
def _on_msg(msg: dict[str, Any]) -> None:
process_event('listening_scanner', msg, msg.get('type'))
process_event('receiver_scanner', msg, msg.get('type'))
response = Response(
sse_stream_fanout(
source_queue=scanner_queue,
channel_key='listening_scanner',
channel_key='receiver_scanner',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
on_message=_on_msg,
@@ -1228,7 +1228,7 @@ def stream_scanner_events() -> Response:
return response
@listening_post_bp.route('/scanner/log')
@receiver_bp.route('/scanner/log')
def get_activity_log() -> Response:
"""Get activity log."""
limit = request.args.get('limit', 100, type=int)
@@ -1239,7 +1239,7 @@ def get_activity_log() -> Response:
})
@listening_post_bp.route('/scanner/log/clear', methods=['POST'])
@receiver_bp.route('/scanner/log/clear', methods=['POST'])
def clear_activity_log() -> Response:
"""Clear activity log."""
with activity_log_lock:
@@ -1247,7 +1247,7 @@ def clear_activity_log() -> Response:
return jsonify({'status': 'cleared'})
@listening_post_bp.route('/presets')
@receiver_bp.route('/presets')
def get_presets() -> Response:
"""Get scanner presets."""
presets = [
@@ -1267,10 +1267,10 @@ def get_presets() -> Response:
# MANUAL AUDIO ENDPOINTS (for direct listening)
# ============================================
@listening_post_bp.route('/audio/start', methods=['POST'])
@receiver_bp.route('/audio/start', methods=['POST'])
def start_audio() -> Response:
"""Start audio at specific frequency (manual mode)."""
global scanner_running, scanner_active_device, listening_active_device, scanner_power_process, scanner_thread
global scanner_running, scanner_active_device, receiver_active_device, scanner_power_process, scanner_thread
global audio_running, audio_frequency, audio_modulation, audio_source
# Stop scanner if running
@@ -1363,9 +1363,9 @@ def start_audio() -> Response:
audio_modulation = modulation
audio_source = 'waterfall'
# Shared monitor uses the waterfall's existing SDR claim.
if listening_active_device is not None:
app_module.release_sdr_device(listening_active_device)
listening_active_device = None
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
receiver_active_device = None
return jsonify({
'status': 'started',
'frequency': frequency,
@@ -1385,15 +1385,15 @@ def start_audio() -> Response:
# may still be tearing down its IQ capture process (thread join +
# safe_terminate can take several seconds), so we retry with back-off
# to give the USB device time to be fully released.
if listening_active_device is None or listening_active_device != device:
if listening_active_device is not None:
app_module.release_sdr_device(listening_active_device)
listening_active_device = None
if receiver_active_device is None or receiver_active_device != device:
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
receiver_active_device = None
error = None
max_claim_attempts = 6
for attempt in range(max_claim_attempts):
error = app_module.claim_sdr_device(device, 'listening')
error = app_module.claim_sdr_device(device, 'receiver')
if not error:
break
if attempt < max_claim_attempts - 1:
@@ -1409,7 +1409,7 @@ def start_audio() -> Response:
'error_type': 'DEVICE_BUSY',
'message': error
}), 409
listening_active_device = device
receiver_active_device = device
_start_audio_stream(frequency, modulation)
@@ -1423,9 +1423,9 @@ def start_audio() -> Response:
})
else:
# Avoid leaving a stale device claim after startup failure.
if listening_active_device is not None:
app_module.release_sdr_device(listening_active_device)
listening_active_device = None
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
receiver_active_device = None
start_error = ''
for log_path in ('/tmp/rtl_fm_stderr.log', '/tmp/ffmpeg_stderr.log'):
@@ -1447,18 +1447,18 @@ def start_audio() -> Response:
}), 500
@listening_post_bp.route('/audio/stop', methods=['POST'])
@receiver_bp.route('/audio/stop', methods=['POST'])
def stop_audio() -> Response:
"""Stop audio."""
global listening_active_device
global receiver_active_device
_stop_audio_stream()
if listening_active_device is not None:
app_module.release_sdr_device(listening_active_device)
listening_active_device = None
if receiver_active_device is not None:
app_module.release_sdr_device(receiver_active_device)
receiver_active_device = None
return jsonify({'status': 'stopped'})
@listening_post_bp.route('/audio/status')
@receiver_bp.route('/audio/status')
def audio_status() -> Response:
"""Get audio status."""
running = audio_running
@@ -1479,7 +1479,7 @@ def audio_status() -> Response:
})
@listening_post_bp.route('/audio/debug')
@receiver_bp.route('/audio/debug')
def audio_debug() -> Response:
"""Get audio debug status and recent stderr logs."""
rtl_log_path = '/tmp/rtl_fm_stderr.log'
@@ -1519,7 +1519,7 @@ def audio_debug() -> Response:
})
@listening_post_bp.route('/audio/probe')
@receiver_bp.route('/audio/probe')
def audio_probe() -> Response:
"""Grab a small chunk of audio bytes from the pipeline for debugging."""
global audio_process
@@ -1559,7 +1559,7 @@ def audio_probe() -> Response:
return jsonify({'status': 'ok', 'bytes': size})
@listening_post_bp.route('/audio/stream')
@receiver_bp.route('/audio/stream')
def stream_audio() -> Response:
"""Stream WAV audio."""
if audio_source == 'waterfall':
@@ -1682,7 +1682,7 @@ def stream_audio() -> Response:
# SIGNAL IDENTIFICATION ENDPOINT
# ============================================
@listening_post_bp.route('/signal/guess', methods=['POST'])
@receiver_bp.route('/signal/guess', methods=['POST'])
def guess_signal() -> Response:
"""Identify a signal based on frequency, modulation, and other parameters."""
data = request.json or {}
@@ -1962,7 +1962,7 @@ def _stop_waterfall_internal() -> None:
waterfall_active_device = None
@listening_post_bp.route('/waterfall/start', methods=['POST'])
@receiver_bp.route('/waterfall/start', methods=['POST'])
def start_waterfall() -> Response:
"""Start the waterfall/spectrogram display."""
global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device
@@ -2023,7 +2023,7 @@ def start_waterfall() -> Response:
return jsonify({'status': 'started', 'config': waterfall_config})
@listening_post_bp.route('/waterfall/stop', methods=['POST'])
@receiver_bp.route('/waterfall/stop', methods=['POST'])
def stop_waterfall() -> Response:
"""Stop the waterfall display."""
_stop_waterfall_internal()
@@ -2031,7 +2031,7 @@ def stop_waterfall() -> Response:
return jsonify({'status': 'stopped'})
@listening_post_bp.route('/waterfall/stream')
@receiver_bp.route('/waterfall/stream')
def stream_waterfall() -> Response:
"""SSE stream for waterfall data."""
def _on_msg(msg: dict[str, Any]) -> None:
@@ -2040,7 +2040,7 @@ def stream_waterfall() -> Response:
response = Response(
sse_stream_fanout(
source_queue=waterfall_queue,
channel_key='listening_waterfall',
channel_key='receiver_waterfall',
timeout=SSE_QUEUE_TIMEOUT,
keepalive_interval=SSE_KEEPALIVE_INTERVAL,
on_message=_on_msg,
+352
View File
@@ -0,0 +1,352 @@
"""Signal identification enrichment routes (SigID Wiki proxy lookup)."""
from __future__ import annotations
import json
import time
import urllib.parse
import urllib.request
from typing import Any
from flask import Blueprint, Response, jsonify, request
from utils.logging import get_logger
logger = get_logger('intercept.signalid')
signalid_bp = Blueprint('signalid', __name__, url_prefix='/signalid')
SIGID_API_URL = 'https://www.sigidwiki.com/api.php'
SIGID_USER_AGENT = 'INTERCEPT-SignalID/1.0'
SIGID_TIMEOUT_SECONDS = 12
SIGID_CACHE_TTL_SECONDS = 600
_cache: dict[str, dict[str, Any]] = {}
def _cache_get(key: str) -> Any | None:
entry = _cache.get(key)
if not entry:
return None
if time.time() >= entry['expires']:
_cache.pop(key, None)
return None
return entry['data']
def _cache_set(key: str, data: Any, ttl_seconds: int = SIGID_CACHE_TTL_SECONDS) -> None:
_cache[key] = {
'data': data,
'expires': time.time() + ttl_seconds,
}
def _fetch_api_json(params: dict[str, str]) -> dict[str, Any] | None:
query = urllib.parse.urlencode(params, doseq=True)
url = f'{SIGID_API_URL}?{query}'
req = urllib.request.Request(url, headers={'User-Agent': SIGID_USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=SIGID_TIMEOUT_SECONDS) as resp:
payload = resp.read().decode('utf-8', errors='replace')
data = json.loads(payload)
except Exception as exc:
logger.warning('SigID API request failed: %s', exc)
return None
if isinstance(data, dict) and data.get('error'):
logger.warning('SigID API returned error: %s', data.get('error'))
return None
return data if isinstance(data, dict) else None
def _ask_query(query: str) -> dict[str, Any] | None:
return _fetch_api_json({
'action': 'ask',
'query': query,
'format': 'json',
})
def _search_query(search_text: str, limit: int) -> dict[str, Any] | None:
return _fetch_api_json({
'action': 'query',
'list': 'search',
'srsearch': search_text,
'srlimit': str(limit),
'format': 'json',
})
def _to_float_list(values: Any) -> list[float]:
if not isinstance(values, list):
return []
out: list[float] = []
for value in values:
try:
out.append(float(value))
except (TypeError, ValueError):
continue
return out
def _to_text_list(values: Any) -> list[str]:
if not isinstance(values, list):
return []
out: list[str] = []
for value in values:
text = str(value or '').strip()
if text:
out.append(text)
return out
def _normalize_modes(values: list[str]) -> list[str]:
out: list[str] = []
for value in values:
for token in str(value).replace('/', ',').split(','):
mode = token.strip().upper()
if mode and mode not in out:
out.append(mode)
return out
def _extract_matches_from_ask(data: dict[str, Any]) -> list[dict[str, Any]]:
results = data.get('query', {}).get('results', {})
if not isinstance(results, dict):
return []
matches: list[dict[str, Any]] = []
for title, entry in results.items():
if not isinstance(entry, dict):
continue
printouts = entry.get('printouts', {})
if not isinstance(printouts, dict):
printouts = {}
frequencies_hz = _to_float_list(printouts.get('Frequencies'))
frequencies_mhz = [round(v / 1e6, 6) for v in frequencies_hz if v > 0]
modes = _normalize_modes(_to_text_list(printouts.get('Mode')))
modulations = _normalize_modes(_to_text_list(printouts.get('Modulation')))
match = {
'title': str(entry.get('fulltext') or title),
'url': str(entry.get('fullurl') or ''),
'frequencies_mhz': frequencies_mhz,
'modes': modes,
'modulations': modulations,
'source': 'SigID Wiki',
}
matches.append(match)
return matches
def _dedupe_matches(matches: list[dict[str, Any]]) -> list[dict[str, Any]]:
deduped: dict[str, dict[str, Any]] = {}
for match in matches:
key = f"{match.get('title', '')}|{match.get('url', '')}"
if key not in deduped:
deduped[key] = match
continue
# Merge frequencies/modes/modulations from duplicates.
existing = deduped[key]
for field in ('frequencies_mhz', 'modes', 'modulations'):
base = existing.get(field, [])
extra = match.get(field, [])
if not isinstance(base, list):
base = []
if not isinstance(extra, list):
extra = []
merged = list(base)
for item in extra:
if item not in merged:
merged.append(item)
existing[field] = merged
return list(deduped.values())
def _rank_matches(
matches: list[dict[str, Any]],
*,
frequency_mhz: float,
modulation: str,
) -> list[dict[str, Any]]:
target_hz = frequency_mhz * 1e6
wanted_mod = str(modulation or '').strip().upper()
def score(match: dict[str, Any]) -> tuple[int, float, str]:
score_value = 0
freqs_mhz = match.get('frequencies_mhz') or []
distances_hz: list[float] = []
for f_mhz in freqs_mhz:
try:
distances_hz.append(abs((float(f_mhz) * 1e6) - target_hz))
except (TypeError, ValueError):
continue
min_distance_hz = min(distances_hz) if distances_hz else 1e12
if min_distance_hz <= 100:
score_value += 120
elif min_distance_hz <= 1_000:
score_value += 90
elif min_distance_hz <= 10_000:
score_value += 70
elif min_distance_hz <= 100_000:
score_value += 40
if wanted_mod:
modes = [str(v).upper() for v in (match.get('modes') or [])]
modulations = [str(v).upper() for v in (match.get('modulations') or [])]
if wanted_mod in modes:
score_value += 25
if wanted_mod in modulations:
score_value += 25
title = str(match.get('title') or '')
title_lower = title.lower()
if 'unidentified' in title_lower or 'unknown' in title_lower:
score_value -= 10
return (score_value, min_distance_hz, title.lower())
ranked = sorted(matches, key=score, reverse=True)
for match in ranked:
try:
nearest = min(abs((float(f) * 1e6) - target_hz) for f in (match.get('frequencies_mhz') or []))
match['distance_hz'] = int(round(nearest))
except Exception:
match['distance_hz'] = None
return ranked
def _format_freq_variants_mhz(freq_mhz: float) -> list[str]:
variants = [
f'{freq_mhz:.6f}'.rstrip('0').rstrip('.'),
f'{freq_mhz:.4f}'.rstrip('0').rstrip('.'),
f'{freq_mhz:.3f}'.rstrip('0').rstrip('.'),
]
out: list[str] = []
for value in variants:
if value and value not in out:
out.append(value)
return out
def _lookup_sigidwiki_matches(frequency_mhz: float, modulation: str, limit: int) -> dict[str, Any]:
all_matches: list[dict[str, Any]] = []
exact_queries: list[str] = []
for freq_token in _format_freq_variants_mhz(frequency_mhz):
query = (
f'[[Category:Signal]][[Frequencies::{freq_token} MHz]]'
f'|?Frequencies|?Mode|?Modulation|limit={max(10, limit * 2)}'
)
exact_queries.append(query)
data = _ask_query(query)
if data:
all_matches.extend(_extract_matches_from_ask(data))
if all_matches:
break
search_used = False
if not all_matches:
search_used = True
search_terms = [f'{frequency_mhz:.4f} MHz']
if modulation:
search_terms.insert(0, f'{frequency_mhz:.4f} MHz {modulation.upper()}')
seen_titles: set[str] = set()
for term in search_terms:
search_data = _search_query(term, max(5, min(limit * 2, 10)))
search_results = search_data.get('query', {}).get('search', []) if isinstance(search_data, dict) else []
if not isinstance(search_results, list) or not search_results:
continue
for item in search_results:
title = str(item.get('title') or '').strip()
if not title or title in seen_titles:
continue
seen_titles.add(title)
page_query = f'[[{title}]]|?Frequencies|?Mode|?Modulation|limit=1'
page_data = _ask_query(page_query)
if page_data:
all_matches.extend(_extract_matches_from_ask(page_data))
if len(all_matches) >= max(limit * 3, 12):
break
if all_matches:
break
deduped = _dedupe_matches(all_matches)
ranked = _rank_matches(deduped, frequency_mhz=frequency_mhz, modulation=modulation)
return {
'matches': ranked[:limit],
'search_used': search_used,
'exact_queries': exact_queries,
}
@signalid_bp.route('/sigidwiki', methods=['POST'])
def sigidwiki_lookup() -> Response:
"""Lookup likely signal types from SigID Wiki by tuned frequency."""
payload = request.get_json(silent=True) or {}
freq_raw = payload.get('frequency_mhz')
if freq_raw is None:
return jsonify({'status': 'error', 'message': 'frequency_mhz is required'}), 400
try:
frequency_mhz = float(freq_raw)
except (TypeError, ValueError):
return jsonify({'status': 'error', 'message': 'Invalid frequency_mhz'}), 400
if frequency_mhz <= 0:
return jsonify({'status': 'error', 'message': 'frequency_mhz must be positive'}), 400
modulation = str(payload.get('modulation') or '').strip().upper()
if modulation and len(modulation) > 16:
modulation = modulation[:16]
limit_raw = payload.get('limit', 8)
try:
limit = int(limit_raw)
except (TypeError, ValueError):
limit = 8
limit = max(1, min(limit, 20))
cache_key = f'{round(frequency_mhz, 6)}|{modulation}|{limit}'
cached = _cache_get(cache_key)
if cached is not None:
return jsonify({
'status': 'ok',
'source': 'sigidwiki',
'frequency_mhz': round(frequency_mhz, 6),
'modulation': modulation or None,
'cached': True,
**cached,
})
try:
lookup = _lookup_sigidwiki_matches(frequency_mhz, modulation, limit)
except Exception as exc:
logger.error('SigID lookup failed: %s', exc)
return jsonify({'status': 'error', 'message': 'SigID lookup failed'}), 502
response_payload = {
'matches': lookup.get('matches', []),
'match_count': len(lookup.get('matches', [])),
'search_used': bool(lookup.get('search_used')),
'exact_queries': lookup.get('exact_queries', []),
}
_cache_set(cache_key, response_payload)
return jsonify({
'status': 'ok',
'source': 'sigidwiki',
'frequency_mhz': round(frequency_mhz, 6),
'modulation': modulation or None,
'cached': False,
**response_payload,
})