From 58222b34745e9838371e186fcb5fa07567196c68 Mon Sep 17 00:00:00 2001 From: James Smith Date: Sun, 3 May 2026 09:24:22 +0100 Subject: [PATCH] fix(hackrf): resolve 'Tools Missing' on RPi when hackrf_info is present MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two root causes behind HackRF showing as unavailable when tools are installed: 1. get_tool_path() didn't search /usr/local/bin on Linux. HackRF tools built from source (as in the Dockerfile) land there, but the path wasn't checked when sudo/service environments have a restricted PATH. 2. check_hackrf() only tested hackrf_transfer, but the health check tests hackrf_info — both come from the same apt package but a user could have one visible and not the other. Now either binary confirms the tools are present. hackrf_transfer is still required for actual RX/TX operations. Fixes #212 Co-Authored-By: Claude Sonnet 4.6 --- utils/dependencies.py | 819 ++++++++++--------- utils/subghz.py | 1748 ++++++++++++++++++++++------------------- 2 files changed, 1357 insertions(+), 1210 deletions(-) diff --git a/utils/dependencies.py b/utils/dependencies.py index e482bb2..01597d3 100644 --- a/utils/dependencies.py +++ b/utils/dependencies.py @@ -7,16 +7,16 @@ import shutil import subprocess from typing import Any -logger = logging.getLogger('intercept.dependencies') +logger = logging.getLogger("intercept.dependencies") -# Additional paths to search for tools (e.g., /usr/sbin on Debian) -EXTRA_TOOL_PATHS = ['/usr/sbin', '/sbin'] +# Additional paths to search for tools (e.g., /usr/sbin on Debian, /usr/local/bin for source builds) +EXTRA_TOOL_PATHS = ["/usr/local/bin", "/usr/sbin", "/sbin"] # Tools installed to non-standard locations (not on PATH) KNOWN_TOOL_PATHS: dict[str, list[str]] = { - 'auto_rx.py': [ - '/opt/radiosonde_auto_rx/auto_rx/auto_rx.py', - '/opt/auto_rx/auto_rx.py', + "auto_rx.py": [ + "/opt/radiosonde_auto_rx/auto_rx/auto_rx.py", + "/opt/auto_rx/auto_rx.py", ], } @@ -36,12 +36,12 @@ def get_tool_path(name: str) -> str | None: # Prefer native Homebrew binaries on Apple Silicon to avoid mixing Rosetta # /usr/local tools with arm64 Python/runtime. - if platform.system() == 'Darwin': + if platform.system() == "Darwin": machine = platform.machine().lower() preferred_paths: list[str] = [] - if machine in {'arm64', 'aarch64'}: - preferred_paths.append('/opt/homebrew/bin') - preferred_paths.append('/usr/local/bin') + if machine in {"arm64", "aarch64"}: + preferred_paths.append("/opt/homebrew/bin") + preferred_paths.append("/usr/local/bin") for base in preferred_paths: full_path = os.path.join(base, name) @@ -78,31 +78,32 @@ def _get_soapy_env() -> dict[str, str]: See: https://github.com/smittix/intercept/issues/77 """ import platform + env = os.environ.copy() - if platform.system() == 'Darwin': + if platform.system() == "Darwin": # Homebrew paths for Apple Silicon and Intel Macs - homebrew_paths = ['/opt/homebrew', '/usr/local'] + homebrew_paths = ["/opt/homebrew", "/usr/local"] lib_paths = [] module_paths = [] for base in homebrew_paths: - lib_path = f'{base}/lib' + lib_path = f"{base}/lib" if os.path.isdir(lib_path): lib_paths.append(lib_path) # SoapySDR modules are in lib/SoapySDR/modules - soapy_mod_base = f'{base}/lib/SoapySDR' + soapy_mod_base = f"{base}/lib/SoapySDR" if os.path.isdir(soapy_mod_base): module_paths.append(soapy_mod_base) if lib_paths: - current_dyld = env.get('DYLD_LIBRARY_PATH', '') - env['DYLD_LIBRARY_PATH'] = ':'.join(lib_paths + ([current_dyld] if current_dyld else [])) + current_dyld = env.get("DYLD_LIBRARY_PATH", "") + env["DYLD_LIBRARY_PATH"] = ":".join(lib_paths + ([current_dyld] if current_dyld else [])) # Set SOAPY_SDR_ROOT if we found Homebrew installation for base in homebrew_paths: - if os.path.isdir(f'{base}/lib/SoapySDR'): - env['SOAPY_SDR_ROOT'] = base + if os.path.isdir(f"{base}/lib/SoapySDR"): + env["SOAPY_SDR_ROOT"] = base break return env @@ -114,7 +115,7 @@ def check_soapy_factory(factory_name: str) -> bool: # Run SoapySDRUtil --info and look for the factory in 'Available factories' # Use macOS-aware environment to find Homebrew-installed modules env = _get_soapy_env() - result = subprocess.run(['SoapySDRUtil', '--info'], capture_output=True, text=True, env=env) + result = subprocess.run(["SoapySDRUtil", "--info"], capture_output=True, text=True, env=env) if result.returncode != 0: return False @@ -134,395 +135,390 @@ def check_soapy_factory(factory_name: str) -> bool: # Comprehensive tool dependency definitions TOOL_DEPENDENCIES = { - 'pager': { - 'name': 'Pager Decoding', - 'tools': { - 'rtl_fm': { - 'required': True, - 'description': 'RTL-SDR FM demodulator', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } - }, - 'multimon-ng': { - 'required': True, - 'description': 'Digital transmission decoder', - 'install': { - 'apt': 'sudo apt install multimon-ng', - 'brew': 'brew install multimon-ng', - 'manual': 'https://github.com/EliasOenal/multimon-ng' - } - }, - 'rtl_test': { - 'required': False, - 'description': 'RTL-SDR device detection', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } - } - } - }, - 'sensor': { - 'name': '433MHz Sensors', - 'tools': { - 'rtl_433': { - 'required': True, - 'description': 'ISM band decoder for sensors, weather stations, TPMS', - 'install': { - 'apt': 'sudo apt install rtl-433', - 'brew': 'brew install rtl_433', - 'manual': 'https://github.com/merbanan/rtl_433' - } - } - } - }, - 'wifi': { - 'name': 'WiFi Reconnaissance', - 'tools': { - 'airmon-ng': { - 'required': True, - 'description': 'Monitor mode controller', - 'install': { - 'apt': 'sudo apt install aircrack-ng', - 'brew': 'Not available on macOS', - 'manual': 'https://aircrack-ng.org' - } - }, - 'airodump-ng': { - 'required': True, - 'description': 'WiFi network scanner', - 'install': { - 'apt': 'sudo apt install aircrack-ng', - 'brew': 'Not available on macOS', - 'manual': 'https://aircrack-ng.org' - } - }, - 'aireplay-ng': { - 'required': False, - 'description': 'Deauthentication / packet injection', - 'install': { - 'apt': 'sudo apt install aircrack-ng', - 'brew': 'Not available on macOS', - 'manual': 'https://aircrack-ng.org' - } - }, - 'aircrack-ng': { - 'required': False, - 'description': 'Handshake verification', - 'install': { - 'apt': 'sudo apt install aircrack-ng', - 'brew': 'brew install aircrack-ng', - 'manual': 'https://aircrack-ng.org' - } - }, - 'hcxdumptool': { - 'required': False, - 'description': 'PMKID capture tool', - 'install': { - 'apt': 'sudo apt install hcxdumptool', - 'brew': 'brew install hcxtools', - 'manual': 'https://github.com/ZerBea/hcxdumptool' - } - }, - 'hcxpcapngtool': { - 'required': False, - 'description': 'PMKID hash extractor', - 'install': { - 'apt': 'sudo apt install hcxtools', - 'brew': 'brew install hcxtools', - 'manual': 'https://github.com/ZerBea/hcxtools' - } - } - } - }, - 'bluetooth': { - 'name': 'Bluetooth Scanning', - 'tools': { - 'hcitool': { - 'required': False, - 'description': 'Bluetooth HCI tool (legacy)', - 'install': { - 'apt': 'sudo apt install bluez', - 'brew': 'Not available on macOS (use native)', - 'manual': 'http://www.bluez.org' - } - }, - 'bluetoothctl': { - 'required': True, - 'description': 'Modern Bluetooth controller', - 'install': { - 'apt': 'sudo apt install bluez', - 'brew': 'Not available on macOS (use native)', - 'manual': 'http://www.bluez.org' - } - }, - 'hciconfig': { - 'required': False, - 'description': 'Bluetooth adapter configuration', - 'install': { - 'apt': 'sudo apt install bluez', - 'brew': 'Not available on macOS', - 'manual': 'http://www.bluez.org' - } - } - } - }, - 'aircraft': { - 'name': 'Aircraft Tracking (ADS-B)', - 'tools': { - 'dump1090': { - 'required': False, - 'description': 'Mode S / ADS-B decoder (preferred)', - 'install': { - 'apt': 'sudo apt install dump1090-mutability (or build dump1090-fa from source)', - 'brew': 'brew install dump1090-mutability', - 'manual': 'https://github.com/flightaware/dump1090' + "pager": { + "name": "Pager Decoding", + "tools": { + "rtl_fm": { + "required": True, + "description": "RTL-SDR FM demodulator", + "install": { + "apt": "sudo apt install rtl-sdr", + "brew": "brew install librtlsdr", + "manual": "https://osmocom.org/projects/rtl-sdr/wiki", }, - 'alternatives': ['dump1090-mutability', 'dump1090-fa'] }, - 'rtl_adsb': { - 'required': False, - 'description': 'Simple ADS-B decoder', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } - } - } - }, - 'acars': { - 'name': 'Aircraft Messaging (ACARS)', - 'tools': { - 'acarsdec': { - 'required': True, - 'description': 'ACARS VHF decoder', - 'install': { - 'apt': 'Run ./setup.sh (builds from source)', - 'brew': 'Run ./setup.sh (builds from source)', - 'manual': 'https://github.com/TLeconte/acarsdec' - } - } - } - }, - 'ais': { - 'name': 'Vessel Tracking (AIS)', - 'tools': { - 'AIS-catcher': { - 'required': True, - 'description': 'AIS receiver and decoder', - 'install': { - 'apt': 'Download .deb from https://github.com/jvde-github/AIS-catcher/releases', - 'brew': 'brew install aiscatcher', - 'manual': 'https://github.com/jvde-github/AIS-catcher/releases' - } - } - } - }, - 'aprs': { - 'name': 'APRS Tracking', - 'tools': { - 'direwolf': { - 'required': False, - 'description': 'APRS/packet radio decoder (preferred)', - 'install': { - 'apt': 'sudo apt install direwolf', - 'brew': 'brew install direwolf', - 'manual': 'https://github.com/wb2osz/direwolf' - } - }, - 'multimon-ng': { - 'required': False, - 'description': 'Alternative AFSK1200 decoder', - 'install': { - 'apt': 'sudo apt install multimon-ng', - 'brew': 'brew install multimon-ng', - 'manual': 'https://github.com/EliasOenal/multimon-ng' - } - } - } - }, - 'satellite': { - 'name': 'Satellite Tracking', - 'tools': { - 'skyfield': { - 'required': True, - 'description': 'Python orbital mechanics library', - 'install': { - 'pip': 'pip install skyfield', - 'manual': 'https://rhodesmill.org/skyfield/' + "multimon-ng": { + "required": True, + "description": "Digital transmission decoder", + "install": { + "apt": "sudo apt install multimon-ng", + "brew": "brew install multimon-ng", + "manual": "https://github.com/EliasOenal/multimon-ng", }, - 'python_module': True - } - } + }, + "rtl_test": { + "required": False, + "description": "RTL-SDR device detection", + "install": { + "apt": "sudo apt install rtl-sdr", + "brew": "brew install librtlsdr", + "manual": "https://osmocom.org/projects/rtl-sdr/wiki", + }, + }, + }, }, - 'sdr_hardware': { - 'name': 'SDR Hardware Support', - 'tools': { - 'SoapySDRUtil': { - 'required': False, - 'description': 'Universal SDR abstraction (required for LimeSDR, HackRF)', - 'install': { - 'apt': 'sudo apt install soapysdr-tools', - 'brew': 'brew install soapysdr', - 'manual': 'https://github.com/pothosware/SoapySDR' - } - }, - 'rx_fm': { - 'required': False, - 'description': 'SoapySDR FM receiver (for non-RTL hardware)', - 'install': { - 'manual': 'Part of SoapySDR utilities or build from source' - } - }, - 'LimeUtil': { - 'required': False, - 'description': 'LimeSDR native utilities', - 'install': { - 'apt': 'sudo apt install limesuite', - 'brew': 'brew install limesuite', - 'manual': 'https://github.com/myriadrf/LimeSuite' - } - }, - 'SoapyLMS7': { - 'required': False, - 'description': 'SoapySDR plugin for LimeSDR', - 'soapy_factory': 'lime', - 'install': { - 'apt': 'sudo apt install soapysdr-module-lms7', - 'brew': 'brew install soapylms7', - 'manual': 'https://github.com/myriadrf/LimeSuite' - } - }, - 'hackrf_info': { - 'required': False, - 'description': 'HackRF native utilities', - 'install': { - 'apt': 'sudo apt install hackrf', - 'brew': 'brew install hackrf', - 'manual': 'https://github.com/greatscottgadgets/hackrf' - } - }, - 'SoapyHackRF': { - 'required': False, - 'description': 'SoapySDR plugin for HackRF', - 'soapy_factory': 'hackrf', - 'install': { - 'apt': 'sudo apt install soapysdr-module-hackrf', - 'brew': 'brew install soapyhackrf', - 'manual': 'https://github.com/pothosware/SoapyHackRF' - } - }, - 'readsb': { - 'required': False, - 'description': 'ADS-B decoder with SoapySDR support', - 'install': { - 'apt': 'Build from source with SoapySDR support', - 'brew': 'Build from source with SoapySDR support', - 'manual': 'https://github.com/wiedehopf/readsb' - } + "sensor": { + "name": "433MHz Sensors", + "tools": { + "rtl_433": { + "required": True, + "description": "ISM band decoder for sensors, weather stations, TPMS", + "install": { + "apt": "sudo apt install rtl-433", + "brew": "brew install rtl_433", + "manual": "https://github.com/merbanan/rtl_433", + }, } - } + }, }, - 'subghz': { - 'name': 'SubGHz Transceiver', - 'tools': { - 'hackrf_transfer': { - 'required': True, - 'description': 'HackRF IQ capture and replay', - 'install': { - 'apt': 'sudo apt install hackrf', - 'brew': 'brew install hackrf', - 'manual': 'https://github.com/greatscottgadgets/hackrf' - } + "wifi": { + "name": "WiFi Reconnaissance", + "tools": { + "airmon-ng": { + "required": True, + "description": "Monitor mode controller", + "install": { + "apt": "sudo apt install aircrack-ng", + "brew": "Not available on macOS", + "manual": "https://aircrack-ng.org", + }, }, - 'hackrf_sweep': { - 'required': False, - 'description': 'HackRF wideband spectrum sweep', - 'install': { - 'apt': 'sudo apt install hackrf', - 'brew': 'brew install hackrf', - 'manual': 'https://github.com/greatscottgadgets/hackrf' - } + "airodump-ng": { + "required": True, + "description": "WiFi network scanner", + "install": { + "apt": "sudo apt install aircrack-ng", + "brew": "Not available on macOS", + "manual": "https://aircrack-ng.org", + }, }, - 'rtl_433': { - 'required': False, - 'description': 'Protocol decoder for SubGHz signals', - 'install': { - 'apt': 'sudo apt install rtl-433', - 'brew': 'brew install rtl_433', - 'manual': 'https://github.com/merbanan/rtl_433' - } - } - } + "aireplay-ng": { + "required": False, + "description": "Deauthentication / packet injection", + "install": { + "apt": "sudo apt install aircrack-ng", + "brew": "Not available on macOS", + "manual": "https://aircrack-ng.org", + }, + }, + "aircrack-ng": { + "required": False, + "description": "Handshake verification", + "install": { + "apt": "sudo apt install aircrack-ng", + "brew": "brew install aircrack-ng", + "manual": "https://aircrack-ng.org", + }, + }, + "hcxdumptool": { + "required": False, + "description": "PMKID capture tool", + "install": { + "apt": "sudo apt install hcxdumptool", + "brew": "brew install hcxtools", + "manual": "https://github.com/ZerBea/hcxdumptool", + }, + }, + "hcxpcapngtool": { + "required": False, + "description": "PMKID hash extractor", + "install": { + "apt": "sudo apt install hcxtools", + "brew": "brew install hcxtools", + "manual": "https://github.com/ZerBea/hcxtools", + }, + }, + }, }, - 'radiosonde': { - 'name': 'Radiosonde Tracking', - 'tools': { - 'auto_rx.py': { - 'required': True, - 'description': 'Radiosonde weather balloon decoder', - 'install': { - 'apt': 'Run ./setup.sh (clones from GitHub)', - 'brew': 'Run ./setup.sh (clones from GitHub)', - 'manual': 'https://github.com/projecthorus/radiosonde_auto_rx' - } - } - } + "bluetooth": { + "name": "Bluetooth Scanning", + "tools": { + "hcitool": { + "required": False, + "description": "Bluetooth HCI tool (legacy)", + "install": { + "apt": "sudo apt install bluez", + "brew": "Not available on macOS (use native)", + "manual": "http://www.bluez.org", + }, + }, + "bluetoothctl": { + "required": True, + "description": "Modern Bluetooth controller", + "install": { + "apt": "sudo apt install bluez", + "brew": "Not available on macOS (use native)", + "manual": "http://www.bluez.org", + }, + }, + "hciconfig": { + "required": False, + "description": "Bluetooth adapter configuration", + "install": { + "apt": "sudo apt install bluez", + "brew": "Not available on macOS", + "manual": "http://www.bluez.org", + }, + }, + }, }, - 'tscm': { - 'name': 'TSCM Counter-Surveillance', - 'tools': { - 'rtl_power': { - 'required': False, - 'description': 'Wideband spectrum sweep for RF analysis', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } + "aircraft": { + "name": "Aircraft Tracking (ADS-B)", + "tools": { + "dump1090": { + "required": False, + "description": "Mode S / ADS-B decoder (preferred)", + "install": { + "apt": "sudo apt install dump1090-mutability (or build dump1090-fa from source)", + "brew": "brew install dump1090-mutability", + "manual": "https://github.com/flightaware/dump1090", + }, + "alternatives": ["dump1090-mutability", "dump1090-fa"], }, - 'rtl_fm': { - 'required': True, - 'description': 'RF signal demodulation', - 'install': { - 'apt': 'sudo apt install rtl-sdr', - 'brew': 'brew install librtlsdr', - 'manual': 'https://osmocom.org/projects/rtl-sdr/wiki' - } + "rtl_adsb": { + "required": False, + "description": "Simple ADS-B decoder", + "install": { + "apt": "sudo apt install rtl-sdr", + "brew": "brew install librtlsdr", + "manual": "https://osmocom.org/projects/rtl-sdr/wiki", + }, }, - 'rtl_433': { - 'required': False, - 'description': 'ISM band device decoding', - 'install': { - 'apt': 'sudo apt install rtl-433', - 'brew': 'brew install rtl_433', - 'manual': 'https://github.com/merbanan/rtl_433' - } - }, - 'airmon-ng': { - 'required': False, - 'description': 'WiFi monitor mode for network scanning', - 'install': { - 'apt': 'sudo apt install aircrack-ng', - 'brew': 'Not available on macOS', - 'manual': 'https://aircrack-ng.org' - } - }, - 'bluetoothctl': { - 'required': False, - 'description': 'Bluetooth device scanning', - 'install': { - 'apt': 'sudo apt install bluez', - 'brew': 'Not available on macOS (use native)', - 'manual': 'http://www.bluez.org' - } + }, + }, + "acars": { + "name": "Aircraft Messaging (ACARS)", + "tools": { + "acarsdec": { + "required": True, + "description": "ACARS VHF decoder", + "install": { + "apt": "Run ./setup.sh (builds from source)", + "brew": "Run ./setup.sh (builds from source)", + "manual": "https://github.com/TLeconte/acarsdec", + }, } - } + }, + }, + "ais": { + "name": "Vessel Tracking (AIS)", + "tools": { + "AIS-catcher": { + "required": True, + "description": "AIS receiver and decoder", + "install": { + "apt": "Download .deb from https://github.com/jvde-github/AIS-catcher/releases", + "brew": "brew install aiscatcher", + "manual": "https://github.com/jvde-github/AIS-catcher/releases", + }, + } + }, + }, + "aprs": { + "name": "APRS Tracking", + "tools": { + "direwolf": { + "required": False, + "description": "APRS/packet radio decoder (preferred)", + "install": { + "apt": "sudo apt install direwolf", + "brew": "brew install direwolf", + "manual": "https://github.com/wb2osz/direwolf", + }, + }, + "multimon-ng": { + "required": False, + "description": "Alternative AFSK1200 decoder", + "install": { + "apt": "sudo apt install multimon-ng", + "brew": "brew install multimon-ng", + "manual": "https://github.com/EliasOenal/multimon-ng", + }, + }, + }, + }, + "satellite": { + "name": "Satellite Tracking", + "tools": { + "skyfield": { + "required": True, + "description": "Python orbital mechanics library", + "install": {"pip": "pip install skyfield", "manual": "https://rhodesmill.org/skyfield/"}, + "python_module": True, + } + }, + }, + "sdr_hardware": { + "name": "SDR Hardware Support", + "tools": { + "SoapySDRUtil": { + "required": False, + "description": "Universal SDR abstraction (required for LimeSDR, HackRF)", + "install": { + "apt": "sudo apt install soapysdr-tools", + "brew": "brew install soapysdr", + "manual": "https://github.com/pothosware/SoapySDR", + }, + }, + "rx_fm": { + "required": False, + "description": "SoapySDR FM receiver (for non-RTL hardware)", + "install": {"manual": "Part of SoapySDR utilities or build from source"}, + }, + "LimeUtil": { + "required": False, + "description": "LimeSDR native utilities", + "install": { + "apt": "sudo apt install limesuite", + "brew": "brew install limesuite", + "manual": "https://github.com/myriadrf/LimeSuite", + }, + }, + "SoapyLMS7": { + "required": False, + "description": "SoapySDR plugin for LimeSDR", + "soapy_factory": "lime", + "install": { + "apt": "sudo apt install soapysdr-module-lms7", + "brew": "brew install soapylms7", + "manual": "https://github.com/myriadrf/LimeSuite", + }, + }, + "hackrf_info": { + "required": False, + "description": "HackRF native utilities", + "install": { + "apt": "sudo apt install hackrf", + "brew": "brew install hackrf", + "manual": "https://github.com/greatscottgadgets/hackrf", + }, + }, + "SoapyHackRF": { + "required": False, + "description": "SoapySDR plugin for HackRF", + "soapy_factory": "hackrf", + "install": { + "apt": "sudo apt install soapysdr-module-hackrf", + "brew": "brew install soapyhackrf", + "manual": "https://github.com/pothosware/SoapyHackRF", + }, + }, + "readsb": { + "required": False, + "description": "ADS-B decoder with SoapySDR support", + "install": { + "apt": "Build from source with SoapySDR support", + "brew": "Build from source with SoapySDR support", + "manual": "https://github.com/wiedehopf/readsb", + }, + }, + }, + }, + "subghz": { + "name": "SubGHz Transceiver", + "tools": { + "hackrf_transfer": { + "required": True, + "description": "HackRF IQ capture and replay", + "install": { + "apt": "sudo apt install hackrf", + "brew": "brew install hackrf", + "manual": "https://github.com/greatscottgadgets/hackrf", + }, + }, + "hackrf_sweep": { + "required": False, + "description": "HackRF wideband spectrum sweep", + "install": { + "apt": "sudo apt install hackrf", + "brew": "brew install hackrf", + "manual": "https://github.com/greatscottgadgets/hackrf", + }, + }, + "rtl_433": { + "required": False, + "description": "Protocol decoder for SubGHz signals", + "install": { + "apt": "sudo apt install rtl-433", + "brew": "brew install rtl_433", + "manual": "https://github.com/merbanan/rtl_433", + }, + }, + }, + }, + "radiosonde": { + "name": "Radiosonde Tracking", + "tools": { + "auto_rx.py": { + "required": True, + "description": "Radiosonde weather balloon decoder", + "install": { + "apt": "Run ./setup.sh (clones from GitHub)", + "brew": "Run ./setup.sh (clones from GitHub)", + "manual": "https://github.com/projecthorus/radiosonde_auto_rx", + }, + } + }, + }, + "tscm": { + "name": "TSCM Counter-Surveillance", + "tools": { + "rtl_power": { + "required": False, + "description": "Wideband spectrum sweep for RF analysis", + "install": { + "apt": "sudo apt install rtl-sdr", + "brew": "brew install librtlsdr", + "manual": "https://osmocom.org/projects/rtl-sdr/wiki", + }, + }, + "rtl_fm": { + "required": True, + "description": "RF signal demodulation", + "install": { + "apt": "sudo apt install rtl-sdr", + "brew": "brew install librtlsdr", + "manual": "https://osmocom.org/projects/rtl-sdr/wiki", + }, + }, + "rtl_433": { + "required": False, + "description": "ISM band device decoding", + "install": { + "apt": "sudo apt install rtl-433", + "brew": "brew install rtl_433", + "manual": "https://github.com/merbanan/rtl_433", + }, + }, + "airmon-ng": { + "required": False, + "description": "WiFi monitor mode for network scanning", + "install": { + "apt": "sudo apt install aircrack-ng", + "brew": "Not available on macOS", + "manual": "https://aircrack-ng.org", + }, + }, + "bluetoothctl": { + "required": False, + "description": "Bluetooth device scanning", + "install": { + "apt": "sudo apt install bluez", + "brew": "Not available on macOS (use native)", + "manual": "http://www.bluez.org", + }, + }, + }, }, } @@ -532,16 +528,11 @@ def check_all_dependencies() -> dict[str, dict[str, Any]]: results: dict[str, dict[str, Any]] = {} for mode, config in TOOL_DEPENDENCIES.items(): - mode_result = { - 'name': config['name'], - 'tools': {}, - 'ready': True, - 'missing_required': [] - } + mode_result = {"name": config["name"], "tools": {}, "ready": True, "missing_required": []} - for tool, tool_config in config['tools'].items(): + for tool, tool_config in config["tools"].items(): # Check if it's a Python module - if tool_config.get('python_module'): + if tool_config.get("python_module"): try: __import__(tool) installed = True @@ -549,23 +540,23 @@ def check_all_dependencies() -> dict[str, dict[str, Any]]: logger.debug(f"Failed to import {tool}: {type(e).__name__}: {e}") installed = False # Check using SoapySDRUtil if specified - elif tool_config.get('soapy_factory'): - installed = check_soapy_factory(tool_config['soapy_factory']) + elif tool_config.get("soapy_factory"): + installed = check_soapy_factory(tool_config["soapy_factory"]) else: # Check for alternatives - alternatives = tool_config.get('alternatives', []) + alternatives = tool_config.get("alternatives", []) installed = check_tool(tool) or any(check_tool(alt) for alt in alternatives) - mode_result['tools'][tool] = { - 'installed': installed, - 'required': tool_config['required'], - 'description': tool_config['description'], - 'install': tool_config['install'] + mode_result["tools"][tool] = { + "installed": installed, + "required": tool_config["required"], + "description": tool_config["description"], + "install": tool_config["install"], } - if tool_config['required'] and not installed: - mode_result['ready'] = False - mode_result['missing_required'].append(tool) + if tool_config["required"] and not installed: + mode_result["ready"] = False + mode_result["missing_required"].append(tool) results[mode] = mode_result diff --git a/utils/subghz.py b/utils/subghz.py index 5977946..54757d5 100644 --- a/utils/subghz.py +++ b/utils/subghz.py @@ -38,12 +38,13 @@ from utils.dependencies import get_tool_path from utils.logging import get_logger from utils.process import register_process, safe_terminate, unregister_process -logger = get_logger('intercept.subghz') +logger = get_logger("intercept.subghz") @dataclass class SubGhzCapture: """Metadata for a saved IQ capture.""" + capture_id: str filename: str frequency_hz: int @@ -53,15 +54,15 @@ class SubGhzCapture: timestamp: str duration_seconds: float = 0.0 size_bytes: int = 0 - label: str = '' - label_source: str = '' + label: str = "" + label_source: str = "" decoded_protocols: list[str] = field(default_factory=list) bursts: list[dict] = field(default_factory=list) - modulation_hint: str = '' + modulation_hint: str = "" modulation_confidence: float = 0.0 - protocol_hint: str = '' - dominant_fingerprint: str = '' - fingerprint_group: str = '' + protocol_hint: str = "" + dominant_fingerprint: str = "" + fingerprint_group: str = "" fingerprint_group_size: int = 0 trigger_enabled: bool = False trigger_pre_seconds: float = 0.0 @@ -69,39 +70,40 @@ class SubGhzCapture: def to_dict(self) -> dict: return { - 'id': self.capture_id, - 'filename': self.filename, - 'frequency_hz': self.frequency_hz, - 'sample_rate': self.sample_rate, - 'lna_gain': self.lna_gain, - 'vga_gain': self.vga_gain, - 'timestamp': self.timestamp, - 'duration_seconds': self.duration_seconds, - 'size_bytes': self.size_bytes, - 'label': self.label, - 'label_source': self.label_source, - 'decoded_protocols': self.decoded_protocols, - 'bursts': self.bursts, - 'modulation_hint': self.modulation_hint, - 'modulation_confidence': self.modulation_confidence, - 'protocol_hint': self.protocol_hint, - 'dominant_fingerprint': self.dominant_fingerprint, - 'fingerprint_group': self.fingerprint_group, - 'fingerprint_group_size': self.fingerprint_group_size, - 'trigger_enabled': self.trigger_enabled, - 'trigger_pre_seconds': self.trigger_pre_seconds, - 'trigger_post_seconds': self.trigger_post_seconds, + "id": self.capture_id, + "filename": self.filename, + "frequency_hz": self.frequency_hz, + "sample_rate": self.sample_rate, + "lna_gain": self.lna_gain, + "vga_gain": self.vga_gain, + "timestamp": self.timestamp, + "duration_seconds": self.duration_seconds, + "size_bytes": self.size_bytes, + "label": self.label, + "label_source": self.label_source, + "decoded_protocols": self.decoded_protocols, + "bursts": self.bursts, + "modulation_hint": self.modulation_hint, + "modulation_confidence": self.modulation_confidence, + "protocol_hint": self.protocol_hint, + "dominant_fingerprint": self.dominant_fingerprint, + "fingerprint_group": self.fingerprint_group, + "fingerprint_group_size": self.fingerprint_group_size, + "trigger_enabled": self.trigger_enabled, + "trigger_pre_seconds": self.trigger_pre_seconds, + "trigger_post_seconds": self.trigger_post_seconds, } @dataclass class SweepPoint: """A single frequency/power data point from hackrf_sweep.""" + freq_mhz: float power_dbm: float def to_dict(self) -> dict: - return {'freq': self.freq_mhz, 'power': self.power_dbm} + return {"freq": self.freq_mhz, "power": self.power_dbm} class SubGhzManager: @@ -112,8 +114,8 @@ class SubGhzManager: """ def __init__(self, data_dir: str | Path | None = None): - self._data_dir = Path(data_dir) if data_dir else Path('data/subghz') - self._captures_dir = self._data_dir / 'captures' + self._data_dir = Path(data_dir) if data_dir else Path("data/subghz") + self._captures_dir = self._data_dir / "captures" self._captures_dir.mkdir(parents=True, exist_ok=True) # Process state @@ -144,9 +146,9 @@ class SubGhzManager: self._rx_trigger_first_burst_start: float | None = None self._rx_trigger_last_burst_end: float | None = None self._rx_autostop_pending = False - self._rx_modulation_hint = '' + self._rx_modulation_hint = "" self._rx_modulation_confidence = 0.0 - self._rx_protocol_hint = '' + self._rx_protocol_hint = "" self._rx_fingerprint_counts: dict[str, int] = {} # Decode state @@ -158,7 +160,7 @@ class SubGhzManager: # TX state self._tx_start_time: float = 0 self._tx_watchdog: threading.Timer | None = None - self._tx_capture_id: str = '' + self._tx_capture_id: str = "" self._tx_temp_file: Path | None = None # Sweep state @@ -187,23 +189,28 @@ class SubGhzManager: except Exception as e: logger.error(f"Error in SubGHz callback: {e}") - # ------------------------------------------------------------------ - # Tool detection - # ------------------------------------------------------------------ - - def _resolve_tool(self, name: str) -> str | None: - """Resolve executable path via PATH first, then platform-aware fallbacks.""" - return shutil.which(name) or get_tool_path(name) - - def check_hackrf(self) -> bool: - if self._hackrf_available is None: - self._hackrf_available = self._resolve_tool('hackrf_transfer') is not None - return self._hackrf_available - - def check_hackrf_info(self) -> bool: - if self._hackrf_info_available is None: - self._hackrf_info_available = self._resolve_tool('hackrf_info') is not None - return self._hackrf_info_available + # ------------------------------------------------------------------ + # Tool detection + # ------------------------------------------------------------------ + + def _resolve_tool(self, name: str) -> str | None: + """Resolve executable path via PATH first, then platform-aware fallbacks.""" + return shutil.which(name) or get_tool_path(name) + + def check_hackrf(self) -> bool: + if self._hackrf_available is None: + # Either binary confirms the hackrf package is installed. hackrf_transfer + # may be absent when only soapysdr-module-hackrf was installed, but + # hackrf_info alone is sufficient to detect and report devices. + self._hackrf_available = ( + self._resolve_tool("hackrf_transfer") is not None or self._resolve_tool("hackrf_info") is not None + ) + return self._hackrf_available + + def check_hackrf_info(self) -> bool: + if self._hackrf_info_available is None: + self._hackrf_info_available = self._resolve_tool("hackrf_info") is not None + return self._hackrf_info_available def check_hackrf_device(self) -> bool | None: """Return True if a HackRF device is detected, False if not, or None if detection unavailable.""" @@ -216,6 +223,7 @@ class SubGhzManager: try: from utils.sdr.detection import detect_hackrf_devices + connected = len(detect_hackrf_devices()) > 0 except Exception as exc: logger.debug(f"HackRF device detection failed: {exc}") @@ -229,18 +237,18 @@ class SubGhzManager: """Return an error string if HackRF is explicitly not detected.""" detected = self.check_hackrf_device() if detected is False: - return 'HackRF device not detected' + return "HackRF device not detected" return None - def check_rtl433(self) -> bool: - if self._rtl433_available is None: - self._rtl433_available = self._resolve_tool('rtl_433') is not None - return self._rtl433_available - - def check_sweep(self) -> bool: - if self._sweep_available is None: - self._sweep_available = self._resolve_tool('hackrf_sweep') is not None - return self._sweep_available + def check_rtl433(self) -> bool: + if self._rtl433_available is None: + self._rtl433_available = self._resolve_tool("rtl_433") is not None + return self._rtl433_available + + def check_sweep(self) -> bool: + if self._sweep_available is None: + self._sweep_available = self._resolve_tool("hackrf_sweep") is not None + return self._sweep_available # ------------------------------------------------------------------ # Status @@ -251,19 +259,19 @@ class SubGhzManager: """Return current active mode or 'idle'.""" with self._lock: if self._rx_process and self._rx_process.poll() is None: - return 'rx' + return "rx" if self._decode_process and self._decode_process.poll() is None: - return 'decode' + return "decode" if self._tx_process and self._tx_process.poll() is None: - return 'tx' + return "tx" if self._sweep_process and self._sweep_process.poll() is None: - return 'sweep' - return 'idle' + return "sweep" + return "idle" def get_status(self) -> dict: mode = self.active_mode hackrf_info_available = self.check_hackrf_info() - detect_paused = mode in {'rx', 'decode', 'tx', 'sweep'} + detect_paused = mode in {"rx", "decode", "tx", "sweep"} if detect_paused: # Avoid probing HackRF while a stream is active. A fresh "disconnected" # cache result should still surface to the UI, otherwise mark unknown. @@ -274,86 +282,97 @@ class SubGhzManager: else: hackrf_connected = self.check_hackrf_device() status: dict = { - 'mode': mode, - 'hackrf_available': self.check_hackrf(), - 'hackrf_info_available': hackrf_info_available, - 'hackrf_connected': hackrf_connected, - 'hackrf_detection_paused': detect_paused, - 'rtl433_available': self.check_rtl433(), - 'sweep_available': self.check_sweep(), + "mode": mode, + "hackrf_available": self.check_hackrf(), + "hackrf_info_available": hackrf_info_available, + "hackrf_connected": hackrf_connected, + "hackrf_detection_paused": detect_paused, + "rtl433_available": self.check_rtl433(), + "sweep_available": self.check_sweep(), } - if mode == 'rx': + if mode == "rx": elapsed = time.time() - self._rx_start_time if self._rx_start_time else 0 - status.update({ - 'frequency_hz': self._rx_frequency_hz, - 'sample_rate': self._rx_sample_rate, - 'elapsed_seconds': round(elapsed, 1), - 'trigger_enabled': self._rx_trigger_enabled, - 'trigger_pre_seconds': round(self._rx_trigger_pre_s, 3), - 'trigger_post_seconds': round(self._rx_trigger_post_s, 3), - }) - elif mode == 'decode': + status.update( + { + "frequency_hz": self._rx_frequency_hz, + "sample_rate": self._rx_sample_rate, + "elapsed_seconds": round(elapsed, 1), + "trigger_enabled": self._rx_trigger_enabled, + "trigger_pre_seconds": round(self._rx_trigger_pre_s, 3), + "trigger_post_seconds": round(self._rx_trigger_post_s, 3), + } + ) + elif mode == "decode": elapsed = time.time() - self._decode_start_time if self._decode_start_time else 0 - status.update({ - 'frequency_hz': self._decode_frequency_hz, - 'sample_rate': self._decode_sample_rate, - 'elapsed_seconds': round(elapsed, 1), - }) - elif mode == 'tx': + status.update( + { + "frequency_hz": self._decode_frequency_hz, + "sample_rate": self._decode_sample_rate, + "elapsed_seconds": round(elapsed, 1), + } + ) + elif mode == "tx": elapsed = time.time() - self._tx_start_time if self._tx_start_time else 0 - status.update({ - 'capture_id': self._tx_capture_id, - 'elapsed_seconds': round(elapsed, 1), - }) + status.update( + { + "capture_id": self._tx_capture_id, + "elapsed_seconds": round(elapsed, 1), + } + ) return status # ------------------------------------------------------------------ # RECEIVE (IQ capture via hackrf_transfer -r) # ------------------------------------------------------------------ - def start_receive( - self, - frequency_hz: int, - sample_rate: int = 2000000, + def start_receive( + self, + frequency_hz: int, + sample_rate: int = 2000000, lna_gain: int = 32, vga_gain: int = 20, trigger_enabled: bool = False, trigger_pre_ms: int = 350, - trigger_post_ms: int = 700, - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - hackrf_transfer_path = self._resolve_tool('hackrf_transfer') - if not hackrf_transfer_path: - return {'status': 'error', 'message': 'hackrf_transfer not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + trigger_post_ms: int = 700, + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_transfer_path = self._resolve_tool("hackrf_transfer") + if not hackrf_transfer_path: + return {"status": "error", "message": "hackrf_transfer not found"} + device_err = self._require_hackrf_device() + if device_err: + return {"status": "error", "message": device_err} with self._lock: - if self.active_mode != 'idle': - return {'status': 'error', 'message': f'Already running: {self.active_mode}'} + if self.active_mode != "idle": + return {"status": "error", "message": f"Already running: {self.active_mode}"} # Validate gains lna_gain = max(SUBGHZ_LNA_GAIN_MIN, min(SUBGHZ_LNA_GAIN_MAX, lna_gain)) vga_gain = max(SUBGHZ_VGA_GAIN_MIN, min(SUBGHZ_VGA_GAIN_MAX, vga_gain)) # Generate filename - ts = datetime.now().strftime('%Y%m%d_%H%M%S') + ts = datetime.now().strftime("%Y%m%d_%H%M%S") freq_mhz = frequency_hz / 1_000_000 basename = f"{freq_mhz:.3f}MHz_{ts}" iq_file = self._captures_dir / f"{basename}.iq" - cmd = [ - hackrf_transfer_path, - '-r', str(iq_file), - '-f', str(frequency_hz), - '-s', str(sample_rate), - '-l', str(lna_gain), - '-g', str(vga_gain), + cmd = [ + hackrf_transfer_path, + "-r", + str(iq_file), + "-f", + str(frequency_hz), + "-s", + str(sample_rate), + "-l", + str(lna_gain), + "-g", + str(vga_gain), ] if device_serial: - cmd.extend(['-d', device_serial]) + cmd.extend(["-d", device_serial]) logger.info(f"SubGHz RX: {' '.join(cmd)}") @@ -362,7 +381,7 @@ class SubGhzManager: iq_file.touch(exist_ok=True) except OSError as e: logger.error(f"Failed to create RX file: {e}") - return {'status': 'error', 'message': 'Failed to create capture file'} + return {"status": "error", "message": "Failed to create capture file"} self._rx_process = subprocess.Popen( cmd, @@ -372,13 +391,13 @@ class SubGhzManager: register_process(self._rx_process) try: - self._rx_file_handle = open(iq_file, 'rb', buffering=0) + self._rx_file_handle = open(iq_file, "rb", buffering=0) except OSError as e: safe_terminate(self._rx_process) unregister_process(self._rx_process) self._rx_process = None logger.error(f"Failed to open RX file: {e}") - return {'status': 'error', 'message': 'Failed to open capture file'} + return {"status": "error", "message": "Failed to open capture file"} self._rx_start_time = time.time() self._rx_frequency_hz = frequency_hz @@ -395,9 +414,9 @@ class SubGhzManager: self._rx_trigger_first_burst_start = None self._rx_trigger_last_burst_end = None self._rx_autostop_pending = False - self._rx_modulation_hint = '' + self._rx_modulation_hint = "" self._rx_modulation_confidence = 0.0 - self._rx_protocol_hint = '' + self._rx_protocol_hint = "" self._rx_fingerprint_counts = {} # Start capture stream reader @@ -413,41 +432,45 @@ class SubGhzManager: daemon=True, ).start() - self._emit({ - 'type': 'status', - 'mode': 'rx', - 'status': 'started', - 'frequency_hz': frequency_hz, - 'sample_rate': sample_rate, - 'trigger_enabled': self._rx_trigger_enabled, - 'trigger_pre_seconds': round(self._rx_trigger_pre_s, 3), - 'trigger_post_seconds': round(self._rx_trigger_post_s, 3), - }) + self._emit( + { + "type": "status", + "mode": "rx", + "status": "started", + "frequency_hz": frequency_hz, + "sample_rate": sample_rate, + "trigger_enabled": self._rx_trigger_enabled, + "trigger_pre_seconds": round(self._rx_trigger_pre_s, 3), + "trigger_post_seconds": round(self._rx_trigger_post_s, 3), + } + ) if self._rx_trigger_enabled: - self._emit({ - 'type': 'info', - 'text': ( - f'[rx] Smart trigger armed ' - f'(pre {self._rx_trigger_pre_s:.2f}s, post {self._rx_trigger_post_s:.2f}s)' - ), - }) + self._emit( + { + "type": "info", + "text": ( + f"[rx] Smart trigger armed " + f"(pre {self._rx_trigger_pre_s:.2f}s, post {self._rx_trigger_post_s:.2f}s)" + ), + } + ) return { - 'status': 'started', - 'frequency_hz': frequency_hz, - 'sample_rate': sample_rate, - 'file': iq_file.name, - 'trigger_enabled': self._rx_trigger_enabled, - 'trigger_pre_seconds': round(self._rx_trigger_pre_s, 3), - 'trigger_post_seconds': round(self._rx_trigger_post_s, 3), + "status": "started", + "frequency_hz": frequency_hz, + "sample_rate": sample_rate, + "file": iq_file.name, + "trigger_enabled": self._rx_trigger_enabled, + "trigger_pre_seconds": round(self._rx_trigger_pre_s, 3), + "trigger_post_seconds": round(self._rx_trigger_post_s, 3), } except FileNotFoundError: - return {'status': 'error', 'message': 'hackrf_transfer not found'} + return {"status": "error", "message": "hackrf_transfer not found"} except Exception as e: logger.error(f"Failed to start RX: {e}") - return {'status': 'error', 'message': str(e)} + return {"status": "error", "message": str(e)} def _estimate_modulation_hint( self, @@ -455,22 +478,22 @@ class SubGhzManager: ) -> tuple[str, float, str]: """Estimate coarse modulation family from raw IQ characteristics.""" if not data: - return 'Unknown', 0.0, 'No samples' + return "Unknown", 0.0, "No samples" try: raw = np.frombuffer(data, dtype=np.int8).astype(np.float32) if raw.size < 2048: - return 'Unknown', 0.0, 'Insufficient samples' + return "Unknown", 0.0, "Insufficient samples" i_vals = raw[0::2] q_vals = raw[1::2] if i_vals.size == 0 or q_vals.size == 0: - return 'Unknown', 0.0, 'Invalid IQ frame' + return "Unknown", 0.0, "Invalid IQ frame" # Light decimation for lower CPU while preserving burst shape. i_vals = i_vals[::4] q_vals = q_vals[::4] if i_vals.size < 256 or q_vals.size < 256: - return 'Unknown', 0.0, 'Short frame' + return "Unknown", 0.0, "Short frame" iq = i_vals + 1j * q_vals amp = np.abs(iq) @@ -499,37 +522,34 @@ class SubGhzManager: mean_run = float(high.size) scores = { - 'OOK/ASK': 0.0, - 'FSK/GFSK': 0.0, - 'PWM/PPM': 0.0, + "OOK/ASK": 0.0, + "FSK/GFSK": 0.0, + "PWM/PPM": 0.0, } # OOK: stronger amplitude contrast and moderate pulse occupancy. - scores['OOK/ASK'] += max(0.0, min(1.0, (amp_cv - 0.22) / 0.35)) - scores['OOK/ASK'] += max(0.0, 1.0 - abs(pulse_density - 0.4) / 0.4) * 0.35 + scores["OOK/ASK"] += max(0.0, min(1.0, (amp_cv - 0.22) / 0.35)) + scores["OOK/ASK"] += max(0.0, 1.0 - abs(pulse_density - 0.4) / 0.4) * 0.35 # FSK: flatter amplitude, more phase movement. - scores['FSK/GFSK'] += max(0.0, min(1.0, (phase_var - 0.45) / 0.9)) - scores['FSK/GFSK'] += max(0.0, min(1.0, (0.33 - amp_cv) / 0.28)) * 0.45 + scores["FSK/GFSK"] += max(0.0, min(1.0, (phase_var - 0.45) / 0.9)) + scores["FSK/GFSK"] += max(0.0, min(1.0, (0.33 - amp_cv) / 0.28)) * 0.45 # PWM/PPM: high edge density with short run lengths. edge_density = 0.0 if mean_run <= 0 else min(1.0, 28.0 / max(mean_run, 1.0)) - scores['PWM/PPM'] += max(0.0, min(1.0, (amp_cv - 0.28) / 0.45)) - scores['PWM/PPM'] += edge_density * 0.6 + scores["PWM/PPM"] += max(0.0, min(1.0, (amp_cv - 0.28) / 0.45)) + scores["PWM/PPM"] += edge_density * 0.6 best_family = max(scores, key=scores.get) best_score = float(scores[best_family]) confidence = max(0.0, min(0.97, best_score)) if confidence < 0.25: - return 'Unknown', confidence, 'No clear modulation signature' + return "Unknown", confidence, "No clear modulation signature" - reason = ( - f'amp_cv={amp_cv:.2f} phase_var={phase_var:.2f} ' - f'pulse_density={pulse_density:.2f}' - ) + reason = f"amp_cv={amp_cv:.2f} phase_var={phase_var:.2f} pulse_density={pulse_density:.2f}" return best_family, confidence, reason except Exception: - return 'Unknown', 0.0, 'Modulation analysis failed' + return "Unknown", 0.0, "Modulation analysis failed" def _fingerprint_burst_bytes( self, @@ -539,20 +559,20 @@ class SubGhzManager: ) -> str: """Create a stable burst fingerprint for grouping similar signals.""" if not data: - return '' + return "" try: raw = np.frombuffer(data, dtype=np.int8).astype(np.float32) if raw.size < 512: - return '' + return "" i_vals = raw[0::2] q_vals = raw[1::2] if i_vals.size == 0 or q_vals.size == 0: - return '' + return "" amp = np.sqrt(i_vals * i_vals + q_vals * q_vals) if amp.size < 64: - return '' + return "" # Normalize and downsample envelope into a fixed-size shape vector. amp = amp - float(np.median(amp)) @@ -571,12 +591,12 @@ class SubGhzManager: sr_khz = int(max(1, round(sample_rate / 1000))) payload = ( quant.tobytes() - + burst_ms.to_bytes(2, 'little', signed=False) - + sr_khz.to_bytes(2, 'little', signed=False) + + burst_ms.to_bytes(2, "little", signed=False) + + sr_khz.to_bytes(2, "little", signed=False) ) return hashlib.sha1(payload).hexdigest()[:16] except Exception: - return '' + return "" def _protocol_hint_from_capture( self, @@ -585,22 +605,22 @@ class SubGhzManager: burst_count: int, ) -> str: freq = frequency_hz / 1_000_000 - mod = (modulation_hint or '').upper() + mod = (modulation_hint or "").upper() if burst_count <= 0: - return 'No burst activity' - if 433.70 <= freq <= 434.10 and 'OOK' in mod and burst_count >= 2: - return 'Likely weather sensor / simple remote telemetry' - if 868.0 <= freq <= 870.0 and 'OOK' in mod: - return 'Likely EU ISM OOK sensor/remote' - if 902.0 <= freq <= 928.0 and 'FSK' in mod: - return 'Likely ISM telemetry (FSK/GFSK)' - if 'PWM' in mod: - return 'Likely pulse-width/distance keyed remote' - if 'FSK' in mod: - return 'Likely continuous-tone telemetry' - if 'OOK' in mod: - return 'Likely OOK keyed burst transmitter' - return 'Unknown protocol family' + return "No burst activity" + if 433.70 <= freq <= 434.10 and "OOK" in mod and burst_count >= 2: + return "Likely weather sensor / simple remote telemetry" + if 868.0 <= freq <= 870.0 and "OOK" in mod: + return "Likely EU ISM OOK sensor/remote" + if 902.0 <= freq <= 928.0 and "FSK" in mod: + return "Likely ISM telemetry (FSK/GFSK)" + if "PWM" in mod: + return "Likely pulse-width/distance keyed remote" + if "FSK" in mod: + return "Likely continuous-tone telemetry" + if "OOK" in mod: + return "Likely OOK keyed burst transmitter" + return "Unknown protocol family" def _auto_capture_label( self, @@ -610,18 +630,18 @@ class SubGhzManager: protocol_hint: str, ) -> str: freq = frequency_hz / 1_000_000 - mod = (modulation_hint or '').upper() + mod = (modulation_hint or "").upper() if burst_count <= 0: - return f'Raw Capture {freq:.3f} MHz' - if 'weather' in protocol_hint.lower(): - return f'Weather-like Burst ({burst_count})' - if 'OOK' in mod: - return f'OOK Burst Cluster ({burst_count})' - if 'FSK' in mod: - return f'FSK Telemetry Burst ({burst_count})' - if 'PWM' in mod: - return f'PWM/PPM Burst ({burst_count})' - return f'RF Burst Capture ({burst_count})' + return f"Raw Capture {freq:.3f} MHz" + if "weather" in protocol_hint.lower(): + return f"Weather-like Burst ({burst_count})" + if "OOK" in mod: + return f"OOK Burst Cluster ({burst_count})" + if "FSK" in mod: + return f"FSK Telemetry Burst ({burst_count})" + if "PWM" in mod: + return f"PWM/PPM Burst ({burst_count})" + return f"RF Burst Capture ({burst_count})" def _trim_capture_to_trigger_window( self, @@ -634,11 +654,8 @@ class SubGhzManager: if not self._rx_trigger_enabled or not bursts or sample_rate <= 0: return duration_seconds, bursts - first_start = min(float(b.get('start_seconds', 0.0)) for b in bursts) - last_end = max( - float(b.get('start_seconds', 0.0)) + float(b.get('duration_seconds', 0.0)) - for b in bursts - ) + first_start = min(float(b.get("start_seconds", 0.0)) for b in bursts) + last_end = max(float(b.get("start_seconds", 0.0)) + float(b.get("duration_seconds", 0.0)) for b in bursts) start_s = max(0.0, first_start - self._rx_trigger_pre_s) end_s = min(duration_seconds, last_end + self._rx_trigger_post_s) if end_s <= start_s: @@ -652,9 +669,9 @@ class SubGhzManager: if end_byte <= start_byte: return duration_seconds, bursts - tmp_path = iq_file.with_suffix('.trimtmp') + tmp_path = iq_file.with_suffix(".trimtmp") try: - with open(iq_file, 'rb') as src, open(tmp_path, 'wb') as dst: + with open(iq_file, "rb") as src, open(tmp_path, "wb") as dst: src.seek(start_byte) remaining = end_byte - start_byte while remaining > 0: @@ -676,14 +693,14 @@ class SubGhzManager: trimmed_duration = max(0.0, float(end_byte - start_byte) / float(bytes_per_second)) adjusted_bursts: list[dict] = [] for burst in bursts: - raw_start = float(burst.get('start_seconds', 0.0)) - raw_dur = max(0.0, float(burst.get('duration_seconds', 0.0))) + raw_start = float(burst.get("start_seconds", 0.0)) + raw_dur = max(0.0, float(burst.get("duration_seconds", 0.0))) raw_end = raw_start + raw_dur if raw_end < start_s or raw_start > end_s: continue adjusted = dict(burst) - adjusted['start_seconds'] = round(max(0.0, raw_start - start_s), 3) - adjusted['duration_seconds'] = round(raw_dur, 3) + adjusted["start_seconds"] = round(max(0.0, raw_start - start_s), 3) + adjusted["duration_seconds"] = round(raw_dur, 3) adjusted_bursts.append(adjusted) return trimmed_duration, adjusted_bursts if adjusted_bursts else bursts @@ -718,7 +735,7 @@ class SubGhzManager: burst_last_high = 0.0 burst_peak = 0 burst_bytes = bytearray() - burst_hint_family = 'Unknown' + burst_hint_family = "Unknown" burst_hint_conf = 0.0 BURST_OFF_HOLD = 0.18 BURST_MIN_DURATION = 0.04 @@ -730,11 +747,11 @@ class SubGhzManager: on_threshold = 0.0 warmup_until = time.time() + 1.0 modulation_scores: dict[str, float] = { - 'OOK/ASK': 0.0, - 'FSK/GFSK': 0.0, - 'PWM/PPM': 0.0, + "OOK/ASK": 0.0, + "FSK/GFSK": 0.0, + "PWM/PPM": 0.0, } - last_hint_reason = '' + last_hint_reason = "" try: fd = file_handle.fileno() @@ -765,7 +782,7 @@ class SubGhzManager: if first_chunk: first_chunk = False - self._emit({'type': 'info', 'text': '[rx] Receiving IQ data...'}) + self._emit({"type": "info", "text": "[rx] Receiving IQ data..."}) now = time.time() if now - last_hint_eval >= HINT_EVAL_INTERVAL: @@ -799,7 +816,7 @@ class SubGhzManager: off_threshold = max(0.8, min(on_threshold - 0.5, noise_floor + off_delta)) rising = smooth_level - prev_smooth_level - self._emit({'type': 'rx_level', 'level': int(round(smooth_level))}) + self._emit({"type": "rx_level", "level": int(round(smooth_level))}) if not burst_active: if now >= warmup_until and smooth_level >= on_threshold and rising >= 0.35: @@ -808,25 +825,25 @@ class SubGhzManager: burst_last_high = now burst_peak = int(round(smooth_level)) burst_bytes = bytearray(data[: min(len(data), MAX_BURST_BYTES)]) - burst_hint_family = 'Unknown' + burst_hint_family = "Unknown" burst_hint_conf = 0.0 if self._rx_trigger_enabled and self._rx_trigger_first_burst_start is None: - self._rx_trigger_first_burst_start = max( - 0.0, now - self._rx_start_time + self._rx_trigger_first_burst_start = max(0.0, now - self._rx_start_time) + self._emit( + { + "type": "info", + "text": "[rx] Trigger fired - capturing burst window", + } ) - self._emit({ - 'type': 'info', - 'text': '[rx] Trigger fired - capturing burst window', - }) - self._emit({ - 'type': 'rx_burst', - 'mode': 'rx', - 'event': 'start', - 'start_offset_s': round( - max(0.0, now - self._rx_start_time), 3 - ), - 'level': int(round(smooth_level)), - }) + self._emit( + { + "type": "rx_burst", + "mode": "rx", + "event": "start", + "start_offset_s": round(max(0.0, now - self._rx_start_time), 3), + "level": int(round(smooth_level)), + } + ) else: if smooth_level >= off_threshold: burst_last_high = now @@ -840,9 +857,7 @@ class SubGhzManager: duration, ) if fp: - self._rx_fingerprint_counts[fp] = ( - self._rx_fingerprint_counts.get(fp, 0) + 1 - ) + self._rx_fingerprint_counts[fp] = self._rx_fingerprint_counts.get(fp, 0) + 1 burst_hint_family, burst_hint_conf, burst_reason = self._estimate_modulation_hint( bytes(burst_bytes) ) @@ -850,31 +865,29 @@ class SubGhzManager: modulation_scores[burst_hint_family] += burst_hint_conf * 1.8 last_hint_reason = burst_reason burst_data = { - 'start_seconds': round( - max(0.0, burst_start - self._rx_start_time), 3 - ), - 'duration_seconds': round(duration, 3), - 'peak_level': int(burst_peak), - 'fingerprint': fp, - 'modulation_hint': burst_hint_family, - 'modulation_confidence': round(float(burst_hint_conf), 3), + "start_seconds": round(max(0.0, burst_start - self._rx_start_time), 3), + "duration_seconds": round(duration, 3), + "peak_level": int(burst_peak), + "fingerprint": fp, + "modulation_hint": burst_hint_family, + "modulation_confidence": round(float(burst_hint_conf), 3), } if len(self._rx_bursts) < 512: self._rx_bursts.append(burst_data) - self._rx_trigger_last_burst_end = max( - 0.0, now - self._rx_start_time + self._rx_trigger_last_burst_end = max(0.0, now - self._rx_start_time) + self._emit( + { + "type": "rx_burst", + "mode": "rx", + "event": "end", + "start_offset_s": burst_data["start_seconds"], + "duration_ms": int(duration * 1000), + "peak_level": int(burst_peak), + "fingerprint": fp, + "modulation_hint": burst_hint_family, + "modulation_confidence": round(float(burst_hint_conf), 3), + } ) - self._emit({ - 'type': 'rx_burst', - 'mode': 'rx', - 'event': 'end', - 'start_offset_s': burst_data['start_seconds'], - 'duration_ms': int(duration * 1000), - 'peak_level': int(burst_peak), - 'fingerprint': fp, - 'modulation_hint': burst_hint_family, - 'modulation_confidence': round(float(burst_hint_conf), 3), - }) burst_active = False burst_peak = 0 burst_bytes = bytearray() @@ -888,20 +901,22 @@ class SubGhzManager: hint_conf = 0.0 if total_score <= 0 else min(0.98, best_score / total_score) protocol_hint = self._protocol_hint_from_capture( self._rx_frequency_hz, - best_family if hint_conf >= 0.3 else 'Unknown', + best_family if hint_conf >= 0.3 else "Unknown", len(self._rx_bursts), ) self._rx_protocol_hint = protocol_hint if hint_conf >= 0.30: self._rx_modulation_hint = best_family self._rx_modulation_confidence = hint_conf - self._emit({ - 'type': 'rx_hint', - 'modulation_hint': best_family, - 'confidence': round(hint_conf, 3), - 'protocol_hint': protocol_hint, - 'reason': last_hint_reason, - }) + self._emit( + { + "type": "rx_hint", + "modulation_hint": best_family, + "confidence": round(hint_conf, 3), + "protocol_hint": protocol_hint, + "reason": last_hint_reason, + } + ) last_hint_emit = now # Smart-trigger auto-stop after quiet post-roll window. @@ -912,25 +927,30 @@ class SubGhzManager: and not self._rx_autostop_pending ): last_end = self._rx_trigger_last_burst_end - if last_end is not None and (max(0.0, now - self._rx_start_time) - last_end) >= self._rx_trigger_post_s: + if ( + last_end is not None + and (max(0.0, now - self._rx_start_time) - last_end) >= self._rx_trigger_post_s + ): self._rx_autostop_pending = True - self._emit({ - 'type': 'info', - 'text': '[rx] Trigger window complete - finalizing capture', - }) + self._emit( + { + "type": "info", + "text": "[rx] Trigger window complete - finalizing capture", + } + ) threading.Thread(target=self.stop_receive, daemon=True).start() break if now - last_wave >= WAVE_INTERVAL: samples = self._extract_waveform(data) if samples: - self._emit({'type': 'rx_waveform', 'samples': samples}) + self._emit({"type": "rx_waveform", "samples": samples}) last_wave = now if now - last_spectrum >= SPECTRUM_INTERVAL: bins = self._compute_rx_spectrum(data) if bins: - self._emit({'type': 'rx_spectrum', 'bins': bins}) + self._emit({"type": "rx_spectrum", "bins": bins}) last_spectrum = now if now - last_stats >= STATS_INTERVAL: @@ -941,20 +961,26 @@ class SubGhzManager: file_size = self._rx_file.stat().st_size except OSError: file_size = 0 - self._emit({ - 'type': 'rx_stats', - 'rate_kb': round(rate_kb, 1), - 'file_size': file_size, - 'elapsed_seconds': round(time.time() - self._rx_start_time, 1) if self._rx_start_time else 0, - }) + self._emit( + { + "type": "rx_stats", + "rate_kb": round(rate_kb, 1), + "file_size": file_size, + "elapsed_seconds": round(time.time() - self._rx_start_time, 1) + if self._rx_start_time + else 0, + } + ) if now - last_log >= 5.0: - self._emit({ - 'type': 'info', - 'text': ( - f'[rx] IQ: {rate_kb:.0f} KB/s ' - f'(lvl {smooth_level:.1f}, floor {noise_floor:.1f}, thr {on_threshold:.1f})' - ), - }) + self._emit( + { + "type": "info", + "text": ( + f"[rx] IQ: {rate_kb:.0f} KB/s " + f"(lvl {smooth_level:.1f}, floor {noise_floor:.1f}, thr {on_threshold:.1f})" + ), + } + ) last_log = now bytes_since_stats = 0 last_stats = now @@ -968,9 +994,7 @@ class SubGhzManager: duration, ) if fp: - self._rx_fingerprint_counts[fp] = ( - self._rx_fingerprint_counts.get(fp, 0) + 1 - ) + self._rx_fingerprint_counts[fp] = self._rx_fingerprint_counts.get(fp, 0) + 1 burst_hint_family, burst_hint_conf, burst_reason = self._estimate_modulation_hint( bytes(burst_bytes) ) @@ -978,31 +1002,29 @@ class SubGhzManager: modulation_scores[burst_hint_family] += burst_hint_conf * 1.8 last_hint_reason = burst_reason burst_data = { - 'start_seconds': round( - max(0.0, burst_start - self._rx_start_time), 3 - ), - 'duration_seconds': round(duration, 3), - 'peak_level': int(burst_peak), - 'fingerprint': fp, - 'modulation_hint': burst_hint_family, - 'modulation_confidence': round(float(burst_hint_conf), 3), + "start_seconds": round(max(0.0, burst_start - self._rx_start_time), 3), + "duration_seconds": round(duration, 3), + "peak_level": int(burst_peak), + "fingerprint": fp, + "modulation_hint": burst_hint_family, + "modulation_confidence": round(float(burst_hint_conf), 3), } if len(self._rx_bursts) < 512: self._rx_bursts.append(burst_data) - self._rx_trigger_last_burst_end = max( - 0.0, time.time() - self._rx_start_time + self._rx_trigger_last_burst_end = max(0.0, time.time() - self._rx_start_time) + self._emit( + { + "type": "rx_burst", + "mode": "rx", + "event": "end", + "start_offset_s": burst_data["start_seconds"], + "duration_ms": int(duration * 1000), + "peak_level": int(burst_peak), + "fingerprint": fp, + "modulation_hint": burst_hint_family, + "modulation_confidence": round(float(burst_hint_conf), 3), + } ) - self._emit({ - 'type': 'rx_burst', - 'mode': 'rx', - 'event': 'end', - 'start_offset_s': burst_data['start_seconds'], - 'duration_ms': int(duration * 1000), - 'peak_level': int(burst_peak), - 'fingerprint': fp, - 'modulation_hint': burst_hint_family, - 'modulation_confidence': round(float(burst_hint_conf), 3), - }) # Finalize modulation summary for capture metadata. if modulation_scores: @@ -1126,12 +1148,12 @@ class SubGhzManager: if not process or not process.stderr: return try: - for line in iter(process.stderr.readline, b''): - text = line.decode('utf-8', errors='replace').strip() + for line in iter(process.stderr.readline, b""): + text = line.decode("utf-8", errors="replace").strip() if text: logger.debug(f"[hackrf_rx] {text}") - if 'error' in text.lower(): - self._emit({'type': 'info', 'text': f'[hackrf_rx] {text}'}) + if "error" in text.lower(): + self._emit({"type": "info", "text": f"[hackrf_rx] {text}"}) except Exception: pass @@ -1141,7 +1163,7 @@ class SubGhzManager: proc_to_terminate: subprocess.Popen | None = None with self._lock: if not self._rx_process or self._rx_process.poll() is not None: - return {'status': 'not_running'} + return {"status": "not_running"} self._rx_stop = True thread_to_join = self._rx_thread @@ -1179,7 +1201,7 @@ class SubGhzManager: bursts=bursts, ) size = iq_file.stat().st_size - dominant_fingerprint = '' + dominant_fingerprint = "" dominant_fingerprint_count = 0 for fp, count in self._rx_fingerprint_counts.items(): if count > dominant_fingerprint_count: @@ -1191,9 +1213,9 @@ class SubGhzManager: if not modulation_hint and bursts: burst_hint_totals: dict[str, float] = {} for burst in bursts: - hint_name = str(burst.get('modulation_hint') or '').strip() - hint_conf = float(burst.get('modulation_confidence') or 0.0) - if not hint_name or hint_name.lower() == 'unknown': + hint_name = str(burst.get("modulation_hint") or "").strip() + hint_conf = float(burst.get("modulation_confidence") or 0.0) + if not hint_name or hint_name.lower() == "unknown": continue burst_hint_totals[hint_name] = burst_hint_totals.get(hint_name, 0.0) + max(0.05, hint_conf) if burst_hint_totals: @@ -1227,7 +1249,7 @@ class SubGhzManager: duration_seconds=round(duration, 1), size_bytes=size, label=label, - label_source='auto', + label_source="auto", bursts=bursts, modulation_hint=modulation_hint, modulation_confidence=round(modulation_confidence, 3), @@ -1237,7 +1259,7 @@ class SubGhzManager: trigger_pre_seconds=round(self._rx_trigger_pre_s, 3), trigger_post_seconds=round(self._rx_trigger_post_s, 3), ) - meta_path = iq_file.with_suffix('.json') + meta_path = iq_file.with_suffix(".json") try: meta_path.write_text(json.dumps(capture.to_dict(), indent=2)) except OSError as e: @@ -1252,101 +1274,186 @@ class SubGhzManager: self._rx_trigger_first_burst_start = None self._rx_trigger_last_burst_end = None self._rx_autostop_pending = False - self._rx_modulation_hint = '' + self._rx_modulation_hint = "" self._rx_modulation_confidence = 0.0 - self._rx_protocol_hint = '' + self._rx_protocol_hint = "" self._rx_fingerprint_counts = {} - self._emit({ - 'type': 'status', - 'mode': 'idle', - 'status': 'stopped', - 'duration_seconds': round(duration, 1), - }) + self._emit( + { + "type": "status", + "mode": "idle", + "status": "stopped", + "duration_seconds": round(duration, 1), + } + ) - result = {'status': 'stopped', 'duration_seconds': round(duration, 1)} + result = {"status": "stopped", "duration_seconds": round(duration, 1)} if capture: - result['capture'] = capture.to_dict() + result["capture"] = capture.to_dict() return result # ------------------------------------------------------------------ # DECODE (hackrf_transfer piped to rtl_433) # ------------------------------------------------------------------ - def start_decode( - self, - frequency_hz: int, - sample_rate: int = 2_000_000, + def start_decode( + self, + frequency_hz: int, + sample_rate: int = 2_000_000, lna_gain: int = 32, vga_gain: int = 20, - decode_profile: str = 'weather', - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - hackrf_transfer_path = self._resolve_tool('hackrf_transfer') - if not hackrf_transfer_path: - return {'status': 'error', 'message': 'hackrf_transfer not found'} - rtl433_path = self._resolve_tool('rtl_433') - if not rtl433_path: - return {'status': 'error', 'message': 'rtl_433 not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + decode_profile: str = "weather", + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_transfer_path = self._resolve_tool("hackrf_transfer") + if not hackrf_transfer_path: + return {"status": "error", "message": "hackrf_transfer not found"} + rtl433_path = self._resolve_tool("rtl_433") + if not rtl433_path: + return {"status": "error", "message": "rtl_433 not found"} + device_err = self._require_hackrf_device() + if device_err: + return {"status": "error", "message": device_err} with self._lock: - if self.active_mode != 'idle': - return {'status': 'error', 'message': f'Already running: {self.active_mode}'} + if self.active_mode != "idle": + return {"status": "error", "message": f"Already running: {self.active_mode}"} # Keep decode bandwidth conservative for stability. 2 Msps is enough # for common SubGHz protocols while staying within HackRF support. requested_sample_rate = int(sample_rate) stable_sample_rate = max(2_000_000, min(2_000_000, requested_sample_rate)) - # Build hackrf_transfer command (producer: raw IQ to stdout) - hackrf_cmd = [ - hackrf_transfer_path, - '-r', '-', - '-f', str(frequency_hz), - '-s', str(stable_sample_rate), - '-l', str(max(SUBGHZ_LNA_GAIN_MIN, min(SUBGHZ_LNA_GAIN_MAX, lna_gain))), - '-g', str(max(SUBGHZ_VGA_GAIN_MIN, min(SUBGHZ_VGA_GAIN_MAX, vga_gain))), + # Build hackrf_transfer command (producer: raw IQ to stdout) + hackrf_cmd = [ + hackrf_transfer_path, + "-r", + "-", + "-f", + str(frequency_hz), + "-s", + str(stable_sample_rate), + "-l", + str(max(SUBGHZ_LNA_GAIN_MIN, min(SUBGHZ_LNA_GAIN_MAX, lna_gain))), + "-g", + str(max(SUBGHZ_VGA_GAIN_MIN, min(SUBGHZ_VGA_GAIN_MAX, vga_gain))), ] if device_serial: - hackrf_cmd.extend(['-d', device_serial]) + hackrf_cmd.extend(["-d", device_serial]) - # Build rtl_433 command (consumer: reads IQ from stdin) - # Feed signed 8-bit complex IQ directly from hackrf_transfer. - rtl433_cmd = [ - rtl433_path, - '-r', 'cs8:-', - '-s', str(stable_sample_rate), - '-f', str(frequency_hz), - '-F', 'json', - '-F', 'log', - '-M', 'level', - '-M', 'noise:5', - '-Y', 'autolevel', - '-Y', 'ampest', - '-Y', 'minsnr=2.5', + # Build rtl_433 command (consumer: reads IQ from stdin) + # Feed signed 8-bit complex IQ directly from hackrf_transfer. + rtl433_cmd = [ + rtl433_path, + "-r", + "cs8:-", + "-s", + str(stable_sample_rate), + "-f", + str(frequency_hz), + "-F", + "json", + "-F", + "log", + "-M", + "level", + "-M", + "noise:5", + "-Y", + "autolevel", + "-Y", + "ampest", + "-Y", + "minsnr=2.5", ] - profile = (decode_profile or 'weather').strip().lower() - if profile == 'weather': + profile = (decode_profile or "weather").strip().lower() + if profile == "weather": # Limit decoder set to weather/temperature/humidity/rain/wind # protocols for better sensitivity and lower CPU load. weather_protocol_ids = [ - 2, 3, 8, 12, 16, 18, 19, 20, 31, 32, 34, 40, 47, 50, 52, - 54, 55, 56, 57, 69, 73, 74, 75, 76, 78, 79, 85, 91, 92, - 108, 109, 111, 112, 113, 119, 120, 124, 127, 132, 133, - 134, 138, 141, 143, 144, 145, 146, 147, 152, 153, 157, - 158, 163, 165, 166, 170, 171, 172, 173, 175, 182, 183, - 184, 194, 195, 196, 205, 206, 213, 214, 215, 217, 219, - 221, 222, + 2, + 3, + 8, + 12, + 16, + 18, + 19, + 20, + 31, + 32, + 34, + 40, + 47, + 50, + 52, + 54, + 55, + 56, + 57, + 69, + 73, + 74, + 75, + 76, + 78, + 79, + 85, + 91, + 92, + 108, + 109, + 111, + 112, + 113, + 119, + 120, + 124, + 127, + 132, + 133, + 134, + 138, + 141, + 143, + 144, + 145, + 146, + 147, + 152, + 153, + 157, + 158, + 163, + 165, + 166, + 170, + 171, + 172, + 173, + 175, + 182, + 183, + 184, + 194, + 195, + 196, + 205, + 206, + 213, + 214, + 215, + 217, + 219, + 221, + 222, ] - rtl433_cmd.extend(['-R', '0']) + rtl433_cmd.extend(["-R", "0"]) for proto_id in weather_protocol_ids: - rtl433_cmd.extend(['-R', str(proto_id)]) + rtl433_cmd.extend(["-R", str(proto_id)]) else: - profile = 'all' + profile = "all" logger.info(f"SubGHz decode: {' '.join(hackrf_cmd)} | {' '.join(rtl433_cmd)}") @@ -1377,15 +1484,17 @@ class SubGhzManager: self._decode_frequency_hz = frequency_hz self._decode_sample_rate = stable_sample_rate self._decode_stop = False - self._emit({'type': 'info', 'text': f'[decode] Profile: {profile}'}) + self._emit({"type": "info", "text": f"[decode] Profile: {profile}"}) if requested_sample_rate != stable_sample_rate: - self._emit({ - 'type': 'info', - 'text': ( - f'[decode] Using {stable_sample_rate} sps ' - f'(requested {requested_sample_rate}) for stable live decode' - ), - }) + self._emit( + { + "type": "info", + "text": ( + f"[decode] Using {stable_sample_rate} sps " + f"(requested {requested_sample_rate}) for stable live decode" + ), + } + ) # Buffered relay: hackrf stdout → queue → rtl_433 stdin # with auto-restart when HackRF USB disconnects. @@ -1420,18 +1529,20 @@ class SubGhzManager: daemon=True, ).start() - self._emit({ - 'type': 'status', - 'mode': 'decode', - 'status': 'started', - 'frequency_hz': frequency_hz, - 'sample_rate': stable_sample_rate, - }) + self._emit( + { + "type": "status", + "mode": "decode", + "status": "started", + "frequency_hz": frequency_hz, + "sample_rate": stable_sample_rate, + } + ) return { - 'status': 'started', - 'frequency_hz': frequency_hz, - 'sample_rate': stable_sample_rate, + "status": "started", + "frequency_hz": frequency_hz, + "sample_rate": stable_sample_rate, } except FileNotFoundError as e: @@ -1439,7 +1550,7 @@ class SubGhzManager: safe_terminate(self._decode_hackrf_process) unregister_process(self._decode_hackrf_process) self._decode_hackrf_process = None - return {'status': 'error', 'message': f'Tool not found: {e.filename or "unknown"}'} + return {"status": "error", "message": f"Tool not found: {e.filename or 'unknown'}"} except Exception as e: for proc in (self._decode_hackrf_process, self._decode_process): if proc: @@ -1448,7 +1559,7 @@ class SubGhzManager: self._decode_hackrf_process = None self._decode_process = None logger.error(f"Failed to start decode: {e}") - return {'status': 'error', 'message': str(e)} + return {"status": "error", "message": str(e)} def _hackrf_reader( self, @@ -1465,9 +1576,9 @@ class SubGhzManager: Uses os.read() on the raw fd to drain the pipe immediately (no Python buffering), minimising backpressure on the USB transfer path. """ - CHUNK = 65536 # 64 KB read size for lower latency - RESTART_DELAY = 0.15 # seconds before restart attempt - MAX_RESTARTS = 3600 # allow longer sessions + CHUNK = 65536 # 64 KB read size for lower latency + RESTART_DELAY = 0.15 # seconds before restart attempt + MAX_RESTARTS = 3600 # allow longer sessions MAX_QUICK_RESTARTS = 6 QUICK_RESTART_WINDOW = 20.0 @@ -1487,7 +1598,7 @@ class SubGhzManager: if not src or (hackrf_proc and hackrf_proc.poll() is not None): if restarts >= MAX_RESTARTS: logger.error("hackrf_transfer: max restarts reached") - self._emit({'type': 'error', 'message': 'HackRF: max restarts reached'}) + self._emit({"type": "error", "message": "HackRF: max restarts reached"}) break # Unregister the dead process before restarting @@ -1522,16 +1633,18 @@ class SubGhzManager: restart_times.append(now) restart_times = [t for t in restart_times if (now - t) <= QUICK_RESTART_WINDOW] if len(restart_times) >= MAX_QUICK_RESTARTS: - self._emit({ - 'type': 'error', - 'message': ( - 'HackRF stream is unstable (restarting repeatedly). ' - 'Try lower gain/sample-rate or reconnect the device.' - ), - }) + self._emit( + { + "type": "error", + "message": ( + "HackRF stream is unstable (restarting repeatedly). " + "Try lower gain/sample-rate or reconnect the device." + ), + } + ) break logger.info(f"hackrf_transfer restarted ({restarts})") - self._emit({'type': 'info', 'text': f'[decode] HackRF stream restarted ({restarts})'}) + self._emit({"type": "info", "text": f"[decode] HackRF stream restarted ({restarts})"}) threading.Thread( target=self._monitor_decode_hackrf_stderr, args=(hackrf_proc,), @@ -1539,10 +1652,12 @@ class SubGhzManager: ).start() except Exception as e: logger.error(f"Failed to restart hackrf_transfer: {e}") - self._emit({ - 'type': 'error', - 'message': f'Failed to restart hackrf_transfer: {e}', - }) + self._emit( + { + "type": "error", + "message": f"Failed to restart hackrf_transfer: {e}", + } + ) break if not src: @@ -1565,11 +1680,11 @@ class SubGhzManager: data = os.read(fd, CHUNK) if not data: if hackrf_proc and hackrf_proc.poll() is not None: - self._emit({'type': 'info', 'text': '[decode] HackRF stream stopped'}) + self._emit({"type": "info", "text": "[decode] HackRF stream stopped"}) break if first_chunk: first_chunk = False - self._emit({'type': 'info', 'text': '[decode] IQ source active'}) + self._emit({"type": "info", "text": "[decode] IQ source active"}) try: iq_queue.put_nowait(data) except queue.Full: @@ -1631,37 +1746,37 @@ class SubGhzManager: if now - last_level >= LEVEL_INTERVAL: level = self._compute_rx_level(data) - self._emit({'type': 'decode_level', 'level': level}) + self._emit({"type": "decode_level", "level": level}) if level >= BURST_ON_LEVEL: burst_last_high = now if not burst_active: burst_active = True burst_start = now burst_peak = level - self._emit({ - 'type': 'rx_burst', - 'mode': 'decode', - 'event': 'start', - 'start_offset_s': round( - max(0.0, now - self._decode_start_time), 3 - ), - 'level': int(level), - }) + self._emit( + { + "type": "rx_burst", + "mode": "decode", + "event": "start", + "start_offset_s": round(max(0.0, now - self._decode_start_time), 3), + "level": int(level), + } + ) else: burst_peak = max(burst_peak, level) elif burst_active and (now - burst_last_high) >= BURST_OFF_HOLD: duration = now - burst_start if duration >= BURST_MIN_DURATION: - self._emit({ - 'type': 'rx_burst', - 'mode': 'decode', - 'event': 'end', - 'start_offset_s': round( - max(0.0, burst_start - self._decode_start_time), 3 - ), - 'duration_ms': int(duration * 1000), - 'peak_level': int(burst_peak), - }) + self._emit( + { + "type": "rx_burst", + "mode": "decode", + "event": "end", + "start_offset_s": round(max(0.0, burst_start - self._decode_start_time), 3), + "duration_ms": int(duration * 1000), + "peak_level": int(burst_peak), + } + ) burst_active = False burst_peak = 0 last_level = now @@ -1669,13 +1784,13 @@ class SubGhzManager: if now - last_wave >= WAVE_INTERVAL: samples = self._extract_waveform(data, points=160) if samples: - self._emit({'type': 'decode_waveform', 'samples': samples}) + self._emit({"type": "decode_waveform", "samples": samples}) last_wave = now if now - last_spectrum >= SPECTRUM_INTERVAL: bins = self._compute_rx_spectrum(data, bins=128) if bins: - self._emit({'type': 'decode_spectrum', 'bins': bins}) + self._emit({"type": "decode_spectrum", "bins": bins}) last_spectrum = now # Pass HackRF cs8 IQ bytes through directly. @@ -1688,45 +1803,51 @@ class SubGhzManager: if first_chunk: first_chunk = False logger.info(f"IQ data flowing to rtl_433 ({len(data)} bytes)") - self._emit({ - 'type': 'info', - 'text': '[decode] Receiving IQ data from HackRF...', - }) + self._emit( + { + "type": "info", + "text": "[decode] Receiving IQ data from HackRF...", + } + ) elapsed = now - last_stats if elapsed >= STATS_INTERVAL: rate_kb = bytes_since_stats / elapsed / 1024 - self._emit({ - 'type': 'info', - 'text': f'[decode] IQ: {rate_kb:.0f} KB/s — listening for signals...', - }) - self._emit({ - 'type': 'decode_raw', - 'text': f'IQ stream active: {rate_kb:.0f} KB/s', - }) + self._emit( + { + "type": "info", + "text": f"[decode] IQ: {rate_kb:.0f} KB/s — listening for signals...", + } + ) + self._emit( + { + "type": "decode_raw", + "text": f"IQ stream active: {rate_kb:.0f} KB/s", + } + ) bytes_since_stats = 0 last_stats = now except (BrokenPipeError, OSError) as e: logger.debug(f"rtl_433 writer pipe closed: {e}") - self._emit({'type': 'info', 'text': f'[decode] Writer pipe closed: {e}'}) + self._emit({"type": "info", "text": f"[decode] Writer pipe closed: {e}"}) except Exception as e: logger.error(f"rtl_433 writer error: {e}") - self._emit({'type': 'error', 'message': f'Decode writer error: {e}'}) + self._emit({"type": "error", "message": f"Decode writer error: {e}"}) finally: if burst_active: duration = max(0.0, time.time() - burst_start) if duration >= BURST_MIN_DURATION: - self._emit({ - 'type': 'rx_burst', - 'mode': 'decode', - 'event': 'end', - 'start_offset_s': round( - max(0.0, burst_start - self._decode_start_time), 3 - ), - 'duration_ms': int(duration * 1000), - 'peak_level': int(burst_peak), - }) + self._emit( + { + "type": "rx_burst", + "mode": "decode", + "event": "end", + "start_offset_s": round(max(0.0, burst_start - self._decode_start_time), 3), + "duration_ms": int(duration * 1000), + "peak_level": int(burst_peak), + } + ) with contextlib.suppress(OSError): dst.close() @@ -1736,8 +1857,8 @@ class SubGhzManager: return got_output = False try: - for line in iter(process.stdout.readline, b''): - text = line.decode('utf-8', errors='replace').strip() + for line in iter(process.stdout.readline, b""): + text = line.decode("utf-8", errors="replace").strip() if not text: continue if not got_output: @@ -1745,10 +1866,10 @@ class SubGhzManager: logger.info("rtl_433 producing output") try: data = json.loads(text) - data['type'] = 'decode' + data["type"] = "decode" self._emit(data) except json.JSONDecodeError: - self._emit({'type': 'decode_raw', 'text': text}) + self._emit({"type": "decode_raw", "text": text}) except Exception as e: logger.error(f"Error reading decode output: {e}") finally: @@ -1756,62 +1877,62 @@ class SubGhzManager: unregister_process(process) if rc is not None and rc != 0 and rc != -15: logger.warning(f"rtl_433 exited with code {rc}") - self._emit({ - 'type': 'info', - 'text': f'[rtl_433] Exited with code {rc}', - }) + self._emit( + { + "type": "info", + "text": f"[rtl_433] Exited with code {rc}", + } + ) with self._lock: if self._decode_process is process: self._decode_process = None self._decode_frequency_hz = 0 self._decode_sample_rate = 0 self._decode_start_time = 0 - self._emit({ - 'type': 'status', - 'mode': 'idle', - 'status': 'decode_stopped', - }) + self._emit( + { + "type": "status", + "mode": "idle", + "status": "decode_stopped", + } + ) def _monitor_decode_hackrf_stderr(self, process: subprocess.Popen) -> None: if not process or not process.stderr: return fatal_disconnect_emitted = False try: - for line in iter(process.stderr.readline, b''): - text = line.decode('utf-8', errors='replace').strip() + for line in iter(process.stderr.readline, b""): + text = line.decode("utf-8", errors="replace").strip() if not text: continue logger.debug(f"[hackrf_decode] {text}") lower = text.lower() - if ( - not fatal_disconnect_emitted - and ( - 'no such device' in lower - or 'device not found' in lower - or 'disconnected' in lower - ) + if not fatal_disconnect_emitted and ( + "no such device" in lower or "device not found" in lower or "disconnected" in lower ): fatal_disconnect_emitted = True self._hackrf_device_cache = False self._hackrf_device_cache_ts = time.time() self._decode_stop = True - self._emit({ - 'type': 'error', - 'message': ( - 'HackRF disconnected during decode. ' - 'Reconnect the device, then press Start again.' - ), - }) + self._emit( + { + "type": "error", + "message": ( + "HackRF disconnected during decode. Reconnect the device, then press Start again." + ), + } + ) if ( - 'error' in lower - or 'usb' in lower - or 'overflow' in lower - or 'underflow' in lower - or 'failed' in lower - or 'couldn' in lower - or 'transfer' in lower + "error" in lower + or "usb" in lower + or "overflow" in lower + or "underflow" in lower + or "failed" in lower + or "couldn" in lower + or "transfer" in lower ): - self._emit({'type': 'info', 'text': f'[hackrf] {text}'}) + self._emit({"type": "info", "text": f"[hackrf] {text}"}) except Exception: pass @@ -1820,18 +1941,28 @@ class SubGhzManager: if not process or not process.stderr: return decode_keywords = ( - 'pulse', 'sync', 'message', 'decoded', 'snr', 'rssi', - 'level', 'modulation', 'bitbuffer', 'symbol', 'short', - 'noise', 'detected', + "pulse", + "sync", + "message", + "decoded", + "snr", + "rssi", + "level", + "modulation", + "bitbuffer", + "symbol", + "short", + "noise", + "detected", ) try: - for line in iter(process.stderr.readline, b''): - text = line.decode('utf-8', errors='replace').strip() + for line in iter(process.stderr.readline, b""): + text = line.decode("utf-8", errors="replace").strip() if text: logger.debug(f"[rtl_433] {text}") - self._emit({'type': 'info', 'text': f'[rtl_433] {text}'}) + self._emit({"type": "info", "text": f"[rtl_433] {text}"}) if any(k in text.lower() for k in decode_keywords): - self._emit({'type': 'decode_raw', 'text': text}) + self._emit({"type": "decode_raw", "text": text}) except Exception: pass @@ -1840,17 +1971,11 @@ class SubGhzManager: rtl433_proc: subprocess.Popen | None = None with self._lock: - hackrf_running = ( - self._decode_hackrf_process - and self._decode_hackrf_process.poll() is None - ) - rtl433_running = ( - self._decode_process - and self._decode_process.poll() is None - ) + hackrf_running = self._decode_hackrf_process and self._decode_hackrf_process.poll() is None + rtl433_running = self._decode_process and self._decode_process.poll() is None if not hackrf_running and not rtl433_running: - return {'status': 'not_running'} + return {"status": "not_running"} # Signal reader thread to stop before killing processes, # preventing it from spawning a new hackrf_transfer during cleanup. @@ -1885,13 +2010,15 @@ class SubGhzManager: safe_terminate(race_proc) unregister_process(race_proc) - self._emit({ - 'type': 'status', - 'mode': 'idle', - 'status': 'stopped', - }) + self._emit( + { + "type": "status", + "mode": "idle", + "status": "stopped", + } + ) - return {'status': 'stopped'} + return {"status": "stopped"} # ------------------------------------------------------------------ # TRANSMIT (replay via hackrf_transfer -t) @@ -1907,10 +2034,8 @@ class SubGhzManager: for band_low, band_high in SUBGHZ_TX_ALLOWED_BANDS: if band_low <= freq_mhz <= band_high: return None - bands_str = ', '.join( - f'{lo}-{hi} MHz' for lo, hi in SUBGHZ_TX_ALLOWED_BANDS - ) - return f'Frequency {freq_mhz:.3f} MHz is outside allowed TX bands: {bands_str}' + bands_str = ", ".join(f"{lo}-{hi} MHz" for lo, hi in SUBGHZ_TX_ALLOWED_BANDS) + return f"Frequency {freq_mhz:.3f} MHz is outside allowed TX bands: {bands_str}" @staticmethod def _estimate_capture_duration_seconds(capture: SubGhzCapture, file_size: int) -> float: @@ -1931,38 +2056,38 @@ class SubGhzManager: except OSError as exc: logger.debug(f"Failed to remove TX temp file {path}: {exc}") - def transmit( - self, - capture_id: str, - tx_gain: int = 20, + def transmit( + self, + capture_id: str, + tx_gain: int = 20, max_duration: int = 10, start_seconds: float | None = None, - duration_seconds: float | None = None, - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - hackrf_transfer_path = self._resolve_tool('hackrf_transfer') - if not hackrf_transfer_path: - return {'status': 'error', 'message': 'hackrf_transfer not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + duration_seconds: float | None = None, + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_transfer_path = self._resolve_tool("hackrf_transfer") + if not hackrf_transfer_path: + return {"status": "error", "message": "hackrf_transfer not found"} + device_err = self._require_hackrf_device() + if device_err: + return {"status": "error", "message": device_err} # Pre-lock: capture lookup, validation, and segment I/O (can be large) capture = self._load_capture(capture_id) if not capture: - return {'status': 'error', 'message': f'Capture not found: {capture_id}'} + return {"status": "error", "message": f"Capture not found: {capture_id}"} freq_error = self.validate_tx_frequency(capture.frequency_hz) if freq_error: - return {'status': 'error', 'message': freq_error} + return {"status": "error", "message": freq_error} tx_gain = max(SUBGHZ_TX_VGA_GAIN_MIN, min(SUBGHZ_TX_VGA_GAIN_MAX, tx_gain)) max_duration = max(1, min(SUBGHZ_TX_MAX_DURATION, max_duration)) iq_path = self._captures_dir / capture.filename if not iq_path.exists(): - return {'status': 'error', 'message': 'IQ file missing'} + return {"status": "error", "message": "IQ file missing"} # Build segment file outside lock (potentially megabytes of read/write) tx_path = iq_path @@ -1972,37 +2097,37 @@ class SubGhzManager: try: start_s = max(0.0, float(start_seconds or 0.0)) except (TypeError, ValueError): - return {'status': 'error', 'message': 'Invalid start_seconds'} + return {"status": "error", "message": "Invalid start_seconds"} try: seg_s = None if duration_seconds is None else float(duration_seconds) except (TypeError, ValueError): - return {'status': 'error', 'message': 'Invalid duration_seconds'} + return {"status": "error", "message": "Invalid duration_seconds"} if seg_s is not None and seg_s <= 0: - return {'status': 'error', 'message': 'duration_seconds must be greater than 0'} + return {"status": "error", "message": "duration_seconds must be greater than 0"} file_size = iq_path.stat().st_size total_duration = self._estimate_capture_duration_seconds(capture, file_size) if total_duration <= 0: - return {'status': 'error', 'message': 'Unable to determine capture duration for segment TX'} + return {"status": "error", "message": "Unable to determine capture duration for segment TX"} if start_s >= total_duration: - return {'status': 'error', 'message': 'start_seconds is beyond end of capture'} + return {"status": "error", "message": "start_seconds is beyond end of capture"} end_s = total_duration if seg_s is None else min(total_duration, start_s + seg_s) if end_s <= start_s: - return {'status': 'error', 'message': 'Selected segment is empty'} + return {"status": "error", "message": "Selected segment is empty"} bytes_per_second = max(2, int(capture.sample_rate) * 2) start_byte = int(start_s * bytes_per_second) & ~1 end_byte = int(end_s * bytes_per_second) & ~1 if end_byte <= start_byte: - return {'status': 'error', 'message': 'Selected segment is too short'} + return {"status": "error", "message": "Selected segment is too short"} segment_size = end_byte - start_byte segment_name = f".txseg_{capture.capture_id}_{uuid.uuid4().hex[:8]}.iq" segment_path = self._captures_dir / segment_name segment_path_for_cleanup = segment_path try: - with open(iq_path, 'rb') as src, open(segment_path, 'wb') as dst: + with open(iq_path, "rb") as src, open(segment_path, "wb") as dst: src.seek(start_byte) remaining = segment_size while remaining > 0: @@ -2014,46 +2139,50 @@ class SubGhzManager: written = segment_path.stat().st_size if segment_path.exists() else 0 except OSError as exc: logger.error(f"Failed to build TX segment: {exc}") - return {'status': 'error', 'message': 'Failed to create TX segment'} + return {"status": "error", "message": "Failed to create TX segment"} if written < 2: try: segment_path.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: pass - return {'status': 'error', 'message': 'Selected TX segment has no IQ data'} + return {"status": "error", "message": "Selected TX segment has no IQ data"} tx_path = segment_path segment_info = { - 'start_seconds': round(start_s, 3), - 'duration_seconds': round(written / bytes_per_second, 3), - 'bytes': int(written), + "start_seconds": round(start_s, 3), + "duration_seconds": round(written / bytes_per_second, 3), + "bytes": int(written), } with self._lock: - if self.active_mode != 'idle': + if self.active_mode != "idle": # Clean up segment file if we prepared one if segment_path_for_cleanup: try: segment_path_for_cleanup.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: pass - return {'status': 'error', 'message': f'Already running: {self.active_mode}'} + return {"status": "error", "message": f"Already running: {self.active_mode}"} # Clear any orphaned temp segment from a previous TX attempt. self._cleanup_tx_temp_file() - if segment_path_for_cleanup: - self._tx_temp_file = segment_path_for_cleanup - - cmd = [ - hackrf_transfer_path, - '-t', str(tx_path), - '-f', str(capture.frequency_hz), - '-s', str(capture.sample_rate), - '-x', str(tx_gain), + if segment_path_for_cleanup: + self._tx_temp_file = segment_path_for_cleanup + + cmd = [ + hackrf_transfer_path, + "-t", + str(tx_path), + "-f", + str(capture.frequency_hz), + "-s", + str(capture.sample_rate), + "-x", + str(tx_gain), ] if device_serial: - cmd.extend(['-d', device_serial]) + cmd.extend(["-d", device_serial]) logger.info(f"SubGHz TX: {' '.join(cmd)}") @@ -2068,9 +2197,7 @@ class SubGhzManager: self._tx_capture_id = capture_id # Start watchdog timer - self._tx_watchdog = threading.Timer( - max_duration, self._tx_watchdog_kill - ) + self._tx_watchdog = threading.Timer(max_duration, self._tx_watchdog_kill) self._tx_watchdog.daemon = True self._tx_watchdog.start() @@ -2080,30 +2207,32 @@ class SubGhzManager: daemon=True, ).start() - self._emit({ - 'type': 'tx_status', - 'status': 'transmitting', - 'capture_id': capture_id, - 'frequency_hz': capture.frequency_hz, - 'max_duration': max_duration, - 'segment': segment_info, - }) + self._emit( + { + "type": "tx_status", + "status": "transmitting", + "capture_id": capture_id, + "frequency_hz": capture.frequency_hz, + "max_duration": max_duration, + "segment": segment_info, + } + ) return { - 'status': 'transmitting', - 'capture_id': capture_id, - 'frequency_hz': capture.frequency_hz, - 'max_duration': max_duration, - 'segment': segment_info, + "status": "transmitting", + "capture_id": capture_id, + "frequency_hz": capture.frequency_hz, + "max_duration": max_duration, + "segment": segment_info, } except FileNotFoundError: self._cleanup_tx_temp_file() - return {'status': 'error', 'message': 'hackrf_transfer not found'} + return {"status": "error", "message": "hackrf_transfer not found"} except Exception as e: self._cleanup_tx_temp_file() logger.error(f"Failed to start TX: {e}") - return {'status': 'error', 'message': str(e)} + return {"status": "error", "message": str(e)} def _tx_watchdog_kill(self) -> None: """Kill TX process when max duration is exceeded.""" @@ -2127,18 +2256,22 @@ class SubGhzManager: if returncode and returncode != 0 and returncode != -15: # Non-zero exit (not SIGTERM) means unexpected death logger.warning(f"hackrf_transfer TX exited unexpectedly (rc={returncode})") - self._emit({ - 'type': 'error', - 'message': f'Transmission failed (hackrf_transfer exited with code {returncode})', - }) + self._emit( + { + "type": "error", + "message": f"Transmission failed (hackrf_transfer exited with code {returncode})", + } + ) self._tx_process = None self._tx_start_time = 0 - self._tx_capture_id = '' - self._emit({ - 'type': 'tx_status', - 'status': 'tx_complete', - 'duration_seconds': round(duration, 1), - }) + self._tx_capture_id = "" + self._emit( + { + "type": "tx_status", + "status": "tx_complete", + "duration_seconds": round(duration, 1), + } + ) if self._tx_watchdog: self._tx_watchdog.cancel() self._tx_watchdog = None @@ -2153,13 +2286,13 @@ class SubGhzManager: if not self._tx_process or self._tx_process.poll() is not None: self._cleanup_tx_temp_file() - return {'status': 'not_running'} + return {"status": "not_running"} proc_to_terminate = self._tx_process self._tx_process = None duration = time.time() - self._tx_start_time if self._tx_start_time else 0 self._tx_start_time = 0 - self._tx_capture_id = '' + self._tx_capture_id = "" self._cleanup_tx_temp_file() # Terminate outside lock to avoid blocking other operations @@ -2167,50 +2300,54 @@ class SubGhzManager: safe_terminate(proc_to_terminate) unregister_process(proc_to_terminate) - self._emit({ - 'type': 'tx_status', - 'status': 'tx_stopped', - 'duration_seconds': round(duration, 1), - }) + self._emit( + { + "type": "tx_status", + "status": "tx_stopped", + "duration_seconds": round(duration, 1), + } + ) - return {'status': 'stopped', 'duration_seconds': round(duration, 1)} + return {"status": "stopped", "duration_seconds": round(duration, 1)} # ------------------------------------------------------------------ # SWEEP (hackrf_sweep) # ------------------------------------------------------------------ - def start_sweep( - self, - freq_start_mhz: float = 300.0, - freq_end_mhz: float = 928.0, - bin_width: int = 100000, - device_serial: str | None = None, - ) -> dict: - # Pre-lock: tool availability & device detection (blocking I/O) - hackrf_sweep_path = self._resolve_tool('hackrf_sweep') - if not hackrf_sweep_path: - return {'status': 'error', 'message': 'hackrf_sweep not found'} - device_err = self._require_hackrf_device() - if device_err: - return {'status': 'error', 'message': device_err} + def start_sweep( + self, + freq_start_mhz: float = 300.0, + freq_end_mhz: float = 928.0, + bin_width: int = 100000, + device_serial: str | None = None, + ) -> dict: + # Pre-lock: tool availability & device detection (blocking I/O) + hackrf_sweep_path = self._resolve_tool("hackrf_sweep") + if not hackrf_sweep_path: + return {"status": "error", "message": "hackrf_sweep not found"} + device_err = self._require_hackrf_device() + if device_err: + return {"status": "error", "message": device_err} # Wait for previous sweep thread to exit (blocking) before lock if self._sweep_thread and self._sweep_thread.is_alive(): self._sweep_thread.join(timeout=2.0) if self._sweep_thread.is_alive(): - return {'status': 'error', 'message': 'Previous sweep still shutting down'} + return {"status": "error", "message": "Previous sweep still shutting down"} with self._lock: - if self.active_mode != 'idle': - return {'status': 'error', 'message': f'Already running: {self.active_mode}'} - - cmd = [ - hackrf_sweep_path, - '-f', f'{int(freq_start_mhz)}:{int(freq_end_mhz)}', - '-w', str(bin_width), - ] + if self.active_mode != "idle": + return {"status": "error", "message": f"Already running: {self.active_mode}"} + + cmd = [ + hackrf_sweep_path, + "-f", + f"{int(freq_start_mhz)}:{int(freq_end_mhz)}", + "-w", + str(bin_width), + ] if device_serial: - cmd.extend(['-d', device_serial]) + cmd.extend(["-d", device_serial]) logger.info(f"SubGHz sweep: {' '.join(cmd)}") @@ -2231,25 +2368,27 @@ class SubGhzManager: ) self._sweep_thread.start() - self._emit({ - 'type': 'status', - 'mode': 'sweep', - 'status': 'started', - 'freq_start_mhz': freq_start_mhz, - 'freq_end_mhz': freq_end_mhz, - }) + self._emit( + { + "type": "status", + "mode": "sweep", + "status": "started", + "freq_start_mhz": freq_start_mhz, + "freq_end_mhz": freq_end_mhz, + } + ) return { - 'status': 'started', - 'freq_start_mhz': freq_start_mhz, - 'freq_end_mhz': freq_end_mhz, + "status": "started", + "freq_start_mhz": freq_start_mhz, + "freq_end_mhz": freq_end_mhz, } except FileNotFoundError: - return {'status': 'error', 'message': 'hackrf_sweep not found'} + return {"status": "error", "message": "hackrf_sweep not found"} except Exception as e: logger.error(f"Failed to start sweep: {e}") - return {'status': 'error', 'message': str(e)} + return {"status": "error", "message": str(e)} def _sweep_loop(self, cmd: list[str]) -> None: """Run hackrf_sweep with auto-restart on USB drops.""" @@ -2265,7 +2404,7 @@ class SubGhzManager: break if restarts >= MAX_RESTARTS: logger.error("hackrf_sweep: max restarts reached") - self._emit({'type': 'error', 'message': 'HackRF sweep: max restarts reached'}) + self._emit({"type": "error", "message": "HackRF sweep: max restarts reached"}) break time.sleep(RESTART_DELAY) @@ -2287,11 +2426,13 @@ class SubGhzManager: break self._sweep_running = False - self._emit({ - 'type': 'status', - 'mode': 'idle', - 'status': 'sweep_stopped', - }) + self._emit( + { + "type": "status", + "mode": "idle", + "status": "sweep_stopped", + } + ) def _parse_sweep_stdout(self) -> None: """Parse hackrf_sweep CSV output into SweepPoint events. @@ -2303,14 +2444,14 @@ class SubGhzManager: if not process or not process.stdout: return try: - for line in iter(process.stdout.readline, b''): + for line in iter(process.stdout.readline, b""): if not self._sweep_running: break - text = line.decode('utf-8', errors='replace').strip() + text = line.decode("utf-8", errors="replace").strip() if not text: continue try: - parts = text.split(',') + parts = text.split(",") if len(parts) < 7: continue hz_low = float(parts[2].strip()) @@ -2323,15 +2464,19 @@ class SubGhzManager: points = [] for i, power in enumerate(powers): freq_hz = hz_low + i * hz_bin_width - points.append({ - 'freq': round(freq_hz / 1_000_000, 4), - 'power': round(power, 1), - }) + points.append( + { + "freq": round(freq_hz / 1_000_000, 4), + "power": round(power, 1), + } + ) - self._emit({ - 'type': 'sweep', - 'points': points, - }) + self._emit( + { + "type": "sweep", + "points": points, + } + ) except Exception as exc: logger.debug(f"Skipping malformed sweep line: {exc}") continue @@ -2343,7 +2488,7 @@ class SubGhzManager: with self._lock: self._sweep_running = False if not self._sweep_process or self._sweep_process.poll() is not None: - return {'status': 'not_running'} + return {"status": "not_running"} proc_to_terminate = self._sweep_process self._sweep_process = None @@ -2357,13 +2502,15 @@ class SubGhzManager: if self._sweep_thread and self._sweep_thread.is_alive(): self._sweep_thread.join(timeout=2.0) - self._emit({ - 'type': 'status', - 'mode': 'idle', - 'status': 'stopped', - }) + self._emit( + { + "type": "status", + "mode": "idle", + "status": "stopped", + } + ) - return {'status': 'stopped'} + return {"status": "stopped"} # ------------------------------------------------------------------ # CAPTURE LIBRARY @@ -2371,53 +2518,55 @@ class SubGhzManager: def list_captures(self) -> list[SubGhzCapture]: captures = [] - for meta_path in sorted(self._captures_dir.glob('*.json'), reverse=True): + for meta_path in sorted(self._captures_dir.glob("*.json"), reverse=True): try: data = json.loads(meta_path.read_text()) - bursts = data.get('bursts', []) - dominant_fingerprint = data.get('dominant_fingerprint', '') + bursts = data.get("bursts", []) + dominant_fingerprint = data.get("dominant_fingerprint", "") if not dominant_fingerprint and isinstance(bursts, list): fp_counts: dict[str, int] = {} for burst in bursts: - fp = '' + fp = "" if isinstance(burst, dict): - fp = str(burst.get('fingerprint') or '').strip() + fp = str(burst.get("fingerprint") or "").strip() if not fp: continue fp_counts[fp] = fp_counts.get(fp, 0) + 1 if fp_counts: dominant_fingerprint = max(fp_counts, key=fp_counts.get) - captures.append(SubGhzCapture( - capture_id=data['id'], - filename=data['filename'], - frequency_hz=data['frequency_hz'], - sample_rate=data['sample_rate'], - lna_gain=data.get('lna_gain', 0), - vga_gain=data.get('vga_gain', 0), - timestamp=data['timestamp'], - duration_seconds=data.get('duration_seconds', 0), - size_bytes=data.get('size_bytes', 0), - label=data.get('label', ''), - label_source=data.get('label_source', ''), - decoded_protocols=data.get('decoded_protocols', []), - bursts=bursts, - modulation_hint=data.get('modulation_hint', ''), - modulation_confidence=data.get('modulation_confidence', 0.0), - protocol_hint=data.get('protocol_hint', ''), - dominant_fingerprint=dominant_fingerprint, - fingerprint_group=data.get('fingerprint_group', ''), - fingerprint_group_size=data.get('fingerprint_group_size', 0), - trigger_enabled=bool(data.get('trigger_enabled', False)), - trigger_pre_seconds=data.get('trigger_pre_seconds', 0.0), - trigger_post_seconds=data.get('trigger_post_seconds', 0.0), - )) + captures.append( + SubGhzCapture( + capture_id=data["id"], + filename=data["filename"], + frequency_hz=data["frequency_hz"], + sample_rate=data["sample_rate"], + lna_gain=data.get("lna_gain", 0), + vga_gain=data.get("vga_gain", 0), + timestamp=data["timestamp"], + duration_seconds=data.get("duration_seconds", 0), + size_bytes=data.get("size_bytes", 0), + label=data.get("label", ""), + label_source=data.get("label_source", ""), + decoded_protocols=data.get("decoded_protocols", []), + bursts=bursts, + modulation_hint=data.get("modulation_hint", ""), + modulation_confidence=data.get("modulation_confidence", 0.0), + protocol_hint=data.get("protocol_hint", ""), + dominant_fingerprint=dominant_fingerprint, + fingerprint_group=data.get("fingerprint_group", ""), + fingerprint_group_size=data.get("fingerprint_group_size", 0), + trigger_enabled=bool(data.get("trigger_enabled", False)), + trigger_pre_seconds=data.get("trigger_pre_seconds", 0.0), + trigger_post_seconds=data.get("trigger_post_seconds", 0.0), + ) + ) except (json.JSONDecodeError, KeyError, OSError) as e: logger.debug(f"Skipping invalid capture metadata {meta_path}: {e}") # Auto-group repeated fingerprints as likely same button/device clusters. fingerprint_groups: dict[str, list[SubGhzCapture]] = {} for capture in captures: - fp = (capture.dominant_fingerprint or '').strip().lower() + fp = (capture.dominant_fingerprint or "").strip().lower() if not fp: continue fingerprint_groups.setdefault(fp, []).append(capture) @@ -2430,46 +2579,46 @@ class SubGhzManager: return captures def _load_capture(self, capture_id: str) -> SubGhzCapture | None: - for meta_path in self._captures_dir.glob('*.json'): + for meta_path in self._captures_dir.glob("*.json"): try: data = json.loads(meta_path.read_text()) - if data.get('id') == capture_id: - bursts = data.get('bursts', []) - dominant_fingerprint = data.get('dominant_fingerprint', '') + if data.get("id") == capture_id: + bursts = data.get("bursts", []) + dominant_fingerprint = data.get("dominant_fingerprint", "") if not dominant_fingerprint and isinstance(bursts, list): fp_counts: dict[str, int] = {} for burst in bursts: - fp = '' + fp = "" if isinstance(burst, dict): - fp = str(burst.get('fingerprint') or '').strip() + fp = str(burst.get("fingerprint") or "").strip() if not fp: continue fp_counts[fp] = fp_counts.get(fp, 0) + 1 if fp_counts: dominant_fingerprint = max(fp_counts, key=fp_counts.get) return SubGhzCapture( - capture_id=data['id'], - filename=data['filename'], - frequency_hz=data['frequency_hz'], - sample_rate=data['sample_rate'], - lna_gain=data.get('lna_gain', 0), - vga_gain=data.get('vga_gain', 0), - timestamp=data['timestamp'], - duration_seconds=data.get('duration_seconds', 0), - size_bytes=data.get('size_bytes', 0), - label=data.get('label', ''), - label_source=data.get('label_source', ''), - decoded_protocols=data.get('decoded_protocols', []), + capture_id=data["id"], + filename=data["filename"], + frequency_hz=data["frequency_hz"], + sample_rate=data["sample_rate"], + lna_gain=data.get("lna_gain", 0), + vga_gain=data.get("vga_gain", 0), + timestamp=data["timestamp"], + duration_seconds=data.get("duration_seconds", 0), + size_bytes=data.get("size_bytes", 0), + label=data.get("label", ""), + label_source=data.get("label_source", ""), + decoded_protocols=data.get("decoded_protocols", []), bursts=bursts, - modulation_hint=data.get('modulation_hint', ''), - modulation_confidence=data.get('modulation_confidence', 0.0), - protocol_hint=data.get('protocol_hint', ''), + modulation_hint=data.get("modulation_hint", ""), + modulation_confidence=data.get("modulation_confidence", 0.0), + protocol_hint=data.get("protocol_hint", ""), dominant_fingerprint=dominant_fingerprint, - fingerprint_group=data.get('fingerprint_group', ''), - fingerprint_group_size=data.get('fingerprint_group_size', 0), - trigger_enabled=bool(data.get('trigger_enabled', False)), - trigger_pre_seconds=data.get('trigger_pre_seconds', 0.0), - trigger_post_seconds=data.get('trigger_post_seconds', 0.0), + fingerprint_group=data.get("fingerprint_group", ""), + fingerprint_group_size=data.get("fingerprint_group_size", 0), + trigger_enabled=bool(data.get("trigger_enabled", False)), + trigger_pre_seconds=data.get("trigger_pre_seconds", 0.0), + trigger_post_seconds=data.get("trigger_post_seconds", 0.0), ) except (json.JSONDecodeError, KeyError, OSError): continue @@ -2492,7 +2641,7 @@ class SubGhzManager: capture_id: str, start_seconds: float | None = None, duration_seconds: float | None = None, - label: str = '', + label: str = "", ) -> dict: """Create a trimmed capture from a selected IQ time window. @@ -2500,27 +2649,27 @@ class SubGhzManager: window is selected automatically with short padding. """ with self._lock: - if self.active_mode != 'idle': - return {'status': 'error', 'message': f'Already running: {self.active_mode}'} + if self.active_mode != "idle": + return {"status": "error", "message": f"Already running: {self.active_mode}"} capture = self._load_capture(capture_id) if not capture: - return {'status': 'error', 'message': f'Capture not found: {capture_id}'} + return {"status": "error", "message": f"Capture not found: {capture_id}"} src_path = self._captures_dir / capture.filename if not src_path.exists(): - return {'status': 'error', 'message': 'IQ file missing'} + return {"status": "error", "message": "IQ file missing"} try: src_size = src_path.stat().st_size except OSError: - return {'status': 'error', 'message': 'Unable to read capture file'} + return {"status": "error", "message": "Unable to read capture file"} if src_size < 2: - return {'status': 'error', 'message': 'Capture file has no IQ data'} + return {"status": "error", "message": "Capture file has no IQ data"} total_duration = self._estimate_capture_duration_seconds(capture, src_size) if total_duration <= 0: - return {'status': 'error', 'message': 'Unable to determine capture duration'} + return {"status": "error", "message": "Unable to determine capture duration"} use_auto_burst = start_seconds is None and duration_seconds is None auto_pad = 0.06 @@ -2530,59 +2679,63 @@ class SubGhzManager: for burst in bursts: if not isinstance(burst, dict): continue - dur = float(burst.get('duration_seconds', 0.0) or 0.0) + dur = float(burst.get("duration_seconds", 0.0) or 0.0) if dur <= 0: continue if best_burst is None: best_burst = burst continue - best_peak = float(best_burst.get('peak_level', 0.0) or 0.0) - cur_peak = float(burst.get('peak_level', 0.0) or 0.0) - if cur_peak > best_peak or cur_peak == best_peak and dur > float(best_burst.get('duration_seconds', 0.0) or 0.0): + best_peak = float(best_burst.get("peak_level", 0.0) or 0.0) + cur_peak = float(burst.get("peak_level", 0.0) or 0.0) + if ( + cur_peak > best_peak + or cur_peak == best_peak + and dur > float(best_burst.get("duration_seconds", 0.0) or 0.0) + ): best_burst = burst if best_burst: - burst_start = max(0.0, float(best_burst.get('start_seconds', 0.0) or 0.0)) - burst_dur = max(0.0, float(best_burst.get('duration_seconds', 0.0) or 0.0)) + burst_start = max(0.0, float(best_burst.get("start_seconds", 0.0) or 0.0)) + burst_dur = max(0.0, float(best_burst.get("duration_seconds", 0.0) or 0.0)) start_seconds = max(0.0, burst_start - auto_pad) end_seconds = min(total_duration, burst_start + burst_dur + auto_pad) duration_seconds = max(0.0, end_seconds - start_seconds) else: return { - 'status': 'error', - 'message': 'No burst markers available. Select a segment manually before trimming.', + "status": "error", + "message": "No burst markers available. Select a segment manually before trimming.", } try: start_s = max(0.0, float(start_seconds or 0.0)) except (TypeError, ValueError): - return {'status': 'error', 'message': 'Invalid start_seconds'} + return {"status": "error", "message": "Invalid start_seconds"} try: seg_s = None if duration_seconds is None else float(duration_seconds) except (TypeError, ValueError): - return {'status': 'error', 'message': 'Invalid duration_seconds'} + return {"status": "error", "message": "Invalid duration_seconds"} if seg_s is not None and seg_s <= 0: - return {'status': 'error', 'message': 'duration_seconds must be greater than 0'} + return {"status": "error", "message": "duration_seconds must be greater than 0"} if start_s >= total_duration: - return {'status': 'error', 'message': 'start_seconds is beyond end of capture'} + return {"status": "error", "message": "start_seconds is beyond end of capture"} end_s = total_duration if seg_s is None else min(total_duration, start_s + seg_s) if end_s <= start_s: - return {'status': 'error', 'message': 'Selected segment is empty'} + return {"status": "error", "message": "Selected segment is empty"} bytes_per_second = max(2, int(capture.sample_rate) * 2) start_byte = int(start_s * bytes_per_second) & ~1 end_byte = int(end_s * bytes_per_second) & ~1 if end_byte <= start_byte: - return {'status': 'error', 'message': 'Selected segment is too short'} + return {"status": "error", "message": "Selected segment is too short"} trim_size = end_byte - start_byte source_stem = Path(capture.filename).stem trim_name = f"{source_stem}_trim_{datetime.now().strftime('%H%M%S')}_{uuid.uuid4().hex[:4]}.iq" trim_path = self._captures_dir / trim_name try: - with open(src_path, 'rb') as src, open(trim_path, 'wb') as dst: + with open(src_path, "rb") as src, open(trim_path, "wb") as dst: src.seek(start_byte) remaining = trim_size while remaining > 0: @@ -2598,14 +2751,14 @@ class SubGhzManager: trim_path.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: pass - return {'status': 'error', 'message': 'Failed to write trimmed capture'} + return {"status": "error", "message": "Failed to write trimmed capture"} if written < 2: try: trim_path.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: pass - return {'status': 'error', 'message': 'Trimmed capture has no IQ data'} + return {"status": "error", "message": "Trimmed capture has no IQ data"} trimmed_duration = round(written / bytes_per_second, 3) @@ -2614,8 +2767,8 @@ class SubGhzManager: for burst in capture.bursts: if not isinstance(burst, dict): continue - burst_start = max(0.0, float(burst.get('start_seconds', 0.0) or 0.0)) - burst_dur = max(0.0, float(burst.get('duration_seconds', 0.0) or 0.0)) + burst_start = max(0.0, float(burst.get("start_seconds", 0.0) or 0.0)) + burst_dur = max(0.0, float(burst.get("duration_seconds", 0.0) or 0.0)) burst_end = burst_start + burst_dur overlap_start = max(start_s, burst_start) overlap_end = min(end_s, burst_end) @@ -2623,14 +2776,14 @@ class SubGhzManager: if overlap_dur <= 0: continue adjusted = dict(burst) - adjusted['start_seconds'] = round(overlap_start - start_s, 3) - adjusted['duration_seconds'] = round(overlap_dur, 3) + adjusted["start_seconds"] = round(overlap_start - start_s, 3) + adjusted["duration_seconds"] = round(overlap_dur, 3) adjusted_bursts.append(adjusted) - dominant_fingerprint = '' + dominant_fingerprint = "" fp_counts: dict[str, int] = {} for burst in adjusted_bursts: - fp = str(burst.get('fingerprint') or '').strip() + fp = str(burst.get("fingerprint") or "").strip() if not fp: continue fp_counts[fp] = fp_counts.get(fp, 0) + 1 @@ -2644,9 +2797,9 @@ class SubGhzManager: if adjusted_bursts: hint_totals: dict[str, float] = {} for burst in adjusted_bursts: - hint = str(burst.get('modulation_hint') or '').strip() - conf = float(burst.get('modulation_confidence') or 0.0) - if not hint or hint.lower() == 'unknown': + hint = str(burst.get("modulation_hint") or "").strip() + conf = float(burst.get("modulation_confidence") or 0.0) + if not hint or hint.lower() == "unknown": continue hint_totals[hint] = hint_totals.get(hint, 0.0) + max(0.05, conf) if hint_totals: @@ -2660,21 +2813,24 @@ class SubGhzManager: len(adjusted_bursts), ) - manual_label = str(label or '').strip() + manual_label = str(label or "").strip() if manual_label: capture_label = manual_label - label_source = 'manual' + label_source = "manual" elif capture.label: - capture_label = f'{capture.label} (Trim)' - label_source = 'auto' + capture_label = f"{capture.label} (Trim)" + label_source = "auto" else: - capture_label = self._auto_capture_label( - capture.frequency_hz, - len(adjusted_bursts), - modulation_hint, - protocol_hint, - ) + ' (Trim)' - label_source = 'auto' + capture_label = ( + self._auto_capture_label( + capture.frequency_hz, + len(adjusted_bursts), + modulation_hint, + protocol_hint, + ) + + " (Trim)" + ) + label_source = "auto" trimmed_capture = SubGhzCapture( capture_id=uuid.uuid4().hex[:12], @@ -2699,7 +2855,7 @@ class SubGhzManager: trigger_post_seconds=0.0, ) - meta_path = trim_path.with_suffix('.json') + meta_path = trim_path.with_suffix(".json") try: meta_path.write_text(json.dumps(trimmed_capture.to_dict(), indent=2)) except OSError as exc: @@ -2708,16 +2864,16 @@ class SubGhzManager: trim_path.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: pass - return {'status': 'error', 'message': 'Failed to write trimmed capture metadata'} + return {"status": "error", "message": "Failed to write trimmed capture metadata"} return { - 'status': 'ok', - 'capture': trimmed_capture.to_dict(), - 'source_capture_id': capture_id, - 'segment': { - 'start_seconds': round(start_s, 3), - 'duration_seconds': round(trimmed_duration, 3), - 'auto_selected': bool(use_auto_burst), + "status": "ok", + "capture": trimmed_capture.to_dict(), + "source_capture_id": capture_id, + "segment": { + "start_seconds": round(start_s, 3), + "duration_seconds": round(trimmed_duration, 3), + "auto_selected": bool(use_auto_burst), }, } @@ -2727,7 +2883,7 @@ class SubGhzManager: return False iq_path = self._captures_dir / capture.filename - meta_path = iq_path.with_suffix('.json') + meta_path = iq_path.with_suffix(".json") deleted = False for path in (iq_path, meta_path): @@ -2740,12 +2896,12 @@ class SubGhzManager: return deleted def update_capture_label(self, capture_id: str, label: str) -> bool: - for meta_path in self._captures_dir.glob('*.json'): + for meta_path in self._captures_dir.glob("*.json"): try: data = json.loads(meta_path.read_text()) - if data.get('id') == capture_id: - data['label'] = label - data['label_source'] = 'manual' if label else data.get('label_source', '') + if data.get("id") == capture_id: + data["label"] = label + data["label_source"] = "manual" if label else data.get("label_source", "") meta_path.write_text(json.dumps(data, indent=2)) return True except (json.JSONDecodeError, KeyError, OSError): @@ -2772,11 +2928,11 @@ class SubGhzManager: self._tx_watchdog = None for proc_attr in ( - '_rx_process', - '_decode_hackrf_process', - '_decode_process', - '_tx_process', - '_sweep_process', + "_rx_process", + "_decode_hackrf_process", + "_decode_process", + "_tx_process", + "_sweep_process", ): process = getattr(self, proc_attr, None) if process and process.poll() is None: @@ -2793,7 +2949,7 @@ class SubGhzManager: self._cleanup_tx_temp_file() self._rx_file = None - self._tx_capture_id = '' + self._tx_capture_id = "" self._rx_start_time = 0 self._rx_bytes_written = 0 @@ -2802,9 +2958,9 @@ class SubGhzManager: self._rx_trigger_first_burst_start = None self._rx_trigger_last_burst_end = None self._rx_autostop_pending = False - self._rx_modulation_hint = '' + self._rx_modulation_hint = "" self._rx_modulation_confidence = 0.0 - self._rx_protocol_hint = '' + self._rx_protocol_hint = "" self._rx_fingerprint_counts = {} self._tx_start_time = 0 self._decode_start_time = 0