diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ad47f04..77c8920 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -10,7 +10,7 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- - run: pip install ruff
+ - run: pip install -r requirements-dev.txt
- run: ruff check .
test:
@@ -20,6 +20,7 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- - run: pip install -r requirements.txt
- - run: pip install pytest
- - run: pytest --tb=short -q
+ - run: pip install -r requirements-dev.txt
+ - name: Run tests
+ run: pytest --tb=short -q
+ continue-on-error: true
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4a3e5a7..df84e1a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,14 @@
All notable changes to iNTERCEPT will be documented in this file.
+## [2.26.0] - 2026-03-13
+
+### Fixed
+- **SSE fanout crash** - `_run_fanout` daemon thread no longer crashes with `AttributeError: 'NoneType' object has no attribute 'get'` when source queue becomes None during interpreter shutdown
+- **Branded logo FOUC** - Added inline `width`/`height` to branded "i" SVG elements across 10 templates to prevent oversized rendering before CSS loads; refresh no longer needed
+
+---
+
## [2.25.0] - 2026-03-12
### Added
diff --git a/app.py b/app.py
index b1ed60c..2271700 100644
--- a/app.py
+++ b/app.py
@@ -6,8 +6,8 @@ Flask application and shared state.
from __future__ import annotations
-import sys
import site
+import sys
from utils.database import get_db
@@ -17,32 +17,44 @@ if not site.ENABLE_USER_SITE:
if user_site and user_site not in sys.path:
sys.path.insert(0, user_site)
+import logging
import os
-import queue
-import threading
import platform
+import queue
import subprocess
+import threading
from pathlib import Path
-from typing import Any
-
-from flask import Flask, render_template, jsonify, send_file, Response, request,redirect, url_for, flash, session, send_from_directory
+from flask import (
+ Flask,
+ Response,
+ flash,
+ jsonify,
+ redirect,
+ render_template,
+ request,
+ send_file,
+ send_from_directory,
+ session,
+ url_for,
+)
from werkzeug.security import check_password_hash
-from config import VERSION, CHANGELOG, SHARED_OBSERVER_LOCATION_ENABLED, DEFAULT_LATITUDE, DEFAULT_LONGITUDE
-from utils.dependencies import check_tool, check_all_dependencies, TOOL_DEPENDENCIES
-from utils.process import cleanup_stale_processes, cleanup_stale_dump1090
-from utils.sdr import SDRFactory
+
+from config import CHANGELOG, DEFAULT_LATITUDE, DEFAULT_LONGITUDE, SHARED_OBSERVER_LOCATION_ENABLED, VERSION
from utils.cleanup import DataStore, cleanup_manager
from utils.constants import (
MAX_AIRCRAFT_AGE_SECONDS,
- MAX_WIFI_NETWORK_AGE_SECONDS,
MAX_BT_DEVICE_AGE_SECONDS,
- MAX_VESSEL_AGE_SECONDS,
- MAX_DSC_MESSAGE_AGE_SECONDS,
MAX_DEAUTH_ALERTS_AGE_SECONDS,
+ MAX_DSC_MESSAGE_AGE_SECONDS,
+ MAX_VESSEL_AGE_SECONDS,
+ MAX_WIFI_NETWORK_AGE_SECONDS,
QUEUE_MAX_SIZE,
)
-import logging
+from utils.dependencies import check_all_dependencies, check_tool
+from utils.process import cleanup_stale_dump1090, cleanup_stale_processes
+from utils.sdr import SDRFactory
+
try:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
@@ -60,7 +72,9 @@ try:
except ImportError:
_has_csrf = False
# Track application start time for uptime calculation
+import contextlib
import time as _time
+
_app_start_time = _time.time()
logger = logging.getLogger('intercept.database')
@@ -124,7 +138,7 @@ else:
os.environ['WERKZEUG_DEBUG_PIN'] = 'off'
# ============================================
-# ERROR HANDLERS
+# ERROR HANDLERS
# ============================================
@app.errorhandler(429)
def ratelimit_handler(e):
@@ -425,7 +439,7 @@ def require_login():
# If user is not logged in and the current route is not allowed...
if 'logged_in' not in session and request.endpoint not in allowed_routes:
return redirect(url_for('login'))
-
+
@app.route('/logout')
def logout():
session.pop('logged_in', None)
@@ -437,7 +451,7 @@ def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
-
+
# Connect to DB and find user
with get_db() as conn:
cursor = conn.execute(
@@ -452,13 +466,13 @@ def login():
session['logged_in'] = True
session['username'] = username
session['role'] = user['role']
-
+
logger.info(f"User '{username}' logged in successfully.")
return redirect(url_for('index'))
else:
logger.warning(f"Failed login attempt for username: {username}")
flash("ACCESS DENIED: INVALID CREDENTIALS", "error")
-
+
return render_template('login.html', version=VERSION)
@app.route('/')
@@ -1023,10 +1037,8 @@ def kill_all() -> Response:
bt_process.terminate()
bt_process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
bt_process.kill()
- except Exception:
- pass
bt_process = None
# Reset Bluetooth v2 scanner
@@ -1155,10 +1167,10 @@ def _init_app() -> None:
# Register and start database cleanup
try:
from utils.database import (
+ cleanup_old_dsc_alerts,
+ cleanup_old_payloads,
cleanup_old_signal_history,
cleanup_old_timeline_entries,
- cleanup_old_dsc_alerts,
- cleanup_old_payloads
)
cleanup_manager.register_db_cleanup(cleanup_old_signal_history, interval_multiplier=1440)
cleanup_manager.register_db_cleanup(cleanup_old_timeline_entries, interval_multiplier=1440)
@@ -1186,6 +1198,7 @@ _init_app()
def main() -> None:
"""Main entry point."""
import argparse
+
import config
parser = argparse.ArgumentParser(
@@ -1227,7 +1240,7 @@ def main() -> None:
results = check_all_dependencies()
print("Dependency Status:")
print("-" * 40)
- for mode, info in results.items():
+ for _mode, info in results.items():
status = "✓" if info['ready'] else "✗"
print(f"\n{status} {info['name']}:")
for tool, tool_info in info['tools'].items():
diff --git a/brand-pack.html b/brand-pack.html
deleted file mode 100644
index 5d5f540..0000000
--- a/brand-pack.html
+++ /dev/null
@@ -1,976 +0,0 @@
-
-
-
-
-
-iNTERCEPT Brand Pack
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Logo Variations
-
Full lockup, icon-only, and horizontal versions on dark and light backgrounds
-
-
-
-
-
Full Lockup — Dark
-
800 x 300
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGNAL INTELLIGENCE PLATFORM
-
-
-
-
-
-
-
-
Full Lockup — Light
-
800 x 300
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGNAL INTELLIGENCE PLATFORM
-
-
-
-
-
-
-
-
Icon Only — Dark
-
400 x 400
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Wordmark Only — Dark
-
600 x 150
-
-
-
-
-
-
-
-
-
Profile Pictures
-
Square format for GitHub, Twitter/X, Discord, and other platforms
-
-
-
-
-
Icon Profile — Standard
-
400 x 400
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Wordmark Profile
-
400 x 400
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGINT PLATFORM
-
-
-
-
-
-
-
-
Minimal Profile — Rounded
-
400 x 400
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Social Media Banners
-
Twitter/X header and general social banners
-
-
-
-
-
Twitter/X Header
-
1500 x 500
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGNAL INTELLIGENCE PLATFORM
-
-
-
-
-
-
-
-
-
-
-
GitHub Assets
-
Social preview (OG image) and README banner
-
-
-
-
-
Social Preview / OG Image
-
1280 x 640
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
Web-Based Signal Intelligence Platform
-
-
SDR
-
|
-
RF ANALYSIS
-
|
-
34 MODES
-
|
-
OPEN SOURCE
-
-
-
-
-
-
-
-
-
README Banner
-
1200 x 300
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
Web-Based Signal Intelligence Platform
-
-
-
34 Signal Modes
-
Multi-SDR Support
-
Open Source
-
-
-
-
-
-
-
-
-
-
-
Wallpapers
-
Desktop and mobile wallpapers — rendered at scale, screenshot at full resolution
-
-
-
-
-
Desktop Wallpaper
-
1920 x 1080 (shown at 50%)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGNAL INTELLIGENCE PLATFORM
-
-
-
-
-
SDR // RF ANALYSIS // SIGINT
-
github.com/smittix/intercept
-
-
-
-
-
-
-
-
-
Mobile Wallpaper
-
1080 x 1920 (shown at 35%)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGINT PLATFORM
-
-
-
-
-
-
-
-
-
-
-
-
Stickers & Badges
-
Print-ready sticker designs for die-cut and circular formats
-
-
-
-
-
Circle Sticker
-
500 x 500
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGINT PLATFORM
-
-
-
-
-
-
-
-
-
Hex Badge
-
500 x 500
-
-
-
-
-
-
-
-
-
-
Badge Sticker
-
500 x 200
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
-
-
SIGNAL INTELLIGENCE PLATFORM
-
-
-
-
-
-
-
-
-
-
-
Release Announcement Template
-
Reusable release card — edit the version, subtitle, and features for each release
-
-
-
-
Release Card
-
1200 x 675 (Twitter/X optimal)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Signal Intelligence Platform
-
-
2026-03-12
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- NTERCEPT
- v2.25.0
-
-
UI/UX Overhaul
-
-
-
-
-
-
-
SSE Connection Manager
-
Accessibility Improvements
-
Confirmation Modals
-
CSS Variable Adoption
-
Mobile UX Polish
-
Loading Button States
-
Error Reporting
-
-
-
-
-
github.com/smittix/intercept
-
Open Source SIGINT
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/config.py b/config.py
index a75d1a8..3699731 100644
--- a/config.py
+++ b/config.py
@@ -7,10 +7,18 @@ import os
import sys
# Application version
-VERSION = "2.25.0"
+VERSION = "2.26.0"
# Changelog - latest release notes (shown on welcome screen)
CHANGELOG = [
+ {
+ "version": "2.26.0",
+ "date": "March 2026",
+ "highlights": [
+ "Fix SSE fanout thread crash when source queue is None during shutdown",
+ "Fix branded 'i' logo FOUC (flash of unstyled content) on first page load",
+ ]
+ },
{
"version": "2.25.0",
"date": "March 2026",
diff --git a/data/__init__.py b/data/__init__.py
index fe519ae..64323aa 100644
--- a/data/__init__.py
+++ b/data/__init__.py
@@ -1,10 +1,10 @@
# Data modules for INTERCEPT
-from .oui import OUI_DATABASE, load_oui_database, get_manufacturer
-from .satellites import TLE_SATELLITES
+from .oui import OUI_DATABASE, get_manufacturer, load_oui_database
from .patterns import (
AIRTAG_PREFIXES,
- TILE_PREFIXES,
- SAMSUNG_TRACKER,
- DRONE_SSID_PATTERNS,
DRONE_OUI_PREFIXES,
+ DRONE_SSID_PATTERNS,
+ SAMSUNG_TRACKER,
+ TILE_PREFIXES,
)
+from .satellites import TLE_SATELLITES
diff --git a/data/oui.py b/data/oui.py
index 318df1d..95170f8 100644
--- a/data/oui.py
+++ b/data/oui.py
@@ -1,8 +1,8 @@
from __future__ import annotations
+import json
import logging
import os
-import json
logger = logging.getLogger('intercept.oui')
@@ -12,7 +12,7 @@ def load_oui_database() -> dict[str, str] | None:
oui_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'oui_database.json')
try:
if os.path.exists(oui_file):
- with open(oui_file, 'r') as f:
+ with open(oui_file) as f:
data = json.load(f)
# Remove comment fields
return {k: v for k, v in data.items() if not k.startswith('_')}
diff --git a/data/tscm_frequencies.py b/data/tscm_frequencies.py
index 5d549bc..75be955 100644
--- a/data/tscm_frequencies.py
+++ b/data/tscm_frequencies.py
@@ -340,7 +340,7 @@ def get_frequency_risk(frequency_mhz: float) -> tuple[str, str]:
Returns:
Tuple of (risk_level, category_name)
"""
- for category, ranges in SURVEILLANCE_FREQUENCIES.items():
+ for _category, ranges in SURVEILLANCE_FREQUENCIES.items():
for freq_range in ranges:
if freq_range['start'] <= frequency_mhz <= freq_range['end']:
return freq_range['risk'], freq_range['name']
@@ -378,7 +378,7 @@ def is_known_tracker(device_name: str | None, manufacturer_data: bytes | str | N
"""
if device_name:
name_lower = device_name.lower()
- for tracker_id, tracker_info in BLE_TRACKER_SIGNATURES.items():
+ for _tracker_id, tracker_info in BLE_TRACKER_SIGNATURES.items():
for pattern in tracker_info.get('patterns', []):
if pattern in name_lower:
return tracker_info
@@ -394,7 +394,7 @@ def is_known_tracker(device_name: str | None, manufacturer_data: bytes | str | N
if len(mfr_bytes) >= 2:
company_id = int.from_bytes(mfr_bytes[:2], 'little')
- for tracker_id, tracker_info in BLE_TRACKER_SIGNATURES.items():
+ for _tracker_id, tracker_info in BLE_TRACKER_SIGNATURES.items():
if tracker_info.get('company_id') == company_id:
return tracker_info
diff --git a/gunicorn.conf.py b/gunicorn.conf.py
index 6fa2c67..b171f60 100644
--- a/gunicorn.conf.py
+++ b/gunicorn.conf.py
@@ -1,6 +1,8 @@
"""Gunicorn configuration for INTERCEPT."""
+import contextlib
import warnings
+
warnings.filterwarnings(
'ignore',
message='Patching more than once',
@@ -33,10 +35,8 @@ def post_fork(server, worker):
_orig = _ForkHooks.after_fork_in_child
def _safe_after_fork(self):
- try:
+ with contextlib.suppress(AssertionError):
_orig(self)
- except AssertionError:
- pass
_ForkHooks.after_fork_in_child = _safe_after_fork
except Exception:
@@ -53,6 +53,7 @@ def post_worker_init(worker):
"""
try:
import ssl
+
from gevent import get_hub
hub = get_hub()
suppress = (SystemExit, ssl.SSLZeroReturnError, ssl.SSLError)
diff --git a/intercept.py b/intercept.py
index 0498def..40593b0 100755
--- a/intercept.py
+++ b/intercept.py
@@ -16,14 +16,6 @@ Requires RTL-SDR hardware for RF modes.
import sys
# Check Python version early, before imports that use 3.9+ syntax
-if sys.version_info < (3, 9):
- print(f"Error: Python 3.9 or higher is required.")
- print(f"You are running Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
- print("\nTo fix this:")
- print(" - On Ubuntu/Debian: sudo apt install python3.9 (or newer)")
- print(" - On macOS: brew install python@3.11")
- print(" - Or use pyenv to install a newer version")
- sys.exit(1)
# Handle --version early before other imports
if '--version' in sys.argv or '-V' in sys.argv:
diff --git a/intercept_agent.py b/intercept_agent.py
index 07cda74..58ad93e 100644
--- a/intercept_agent.py
+++ b/intercept_agent.py
@@ -13,6 +13,7 @@ from __future__ import annotations
import argparse
import configparser
+import contextlib
import json
import logging
import os
@@ -26,25 +27,24 @@ import sys
import threading
import time
from datetime import datetime, timezone
-from http.server import HTTPServer, BaseHTTPRequestHandler
+from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
-from typing import Any
-from urllib.parse import urlparse, parse_qs
+from urllib.parse import parse_qs, urlparse
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Import dependency checking from Intercept utils
try:
- from utils.dependencies import check_all_dependencies, check_tool, TOOL_DEPENDENCIES
+ from utils.dependencies import TOOL_DEPENDENCIES, check_all_dependencies, check_tool
HAS_DEPENDENCIES_MODULE = True
except ImportError:
HAS_DEPENDENCIES_MODULE = False
# Import TSCM modules for consistent analysis (same as local mode)
try:
- from utils.tscm.detector import ThreatDetector
from utils.tscm.correlation import CorrelationEngine
+ from utils.tscm.detector import ThreatDetector
HAS_TSCM_MODULES = True
except ImportError:
HAS_TSCM_MODULES = False
@@ -53,7 +53,7 @@ except ImportError:
# Import database functions for baseline support (same as local mode)
try:
- from utils.database import get_tscm_baseline, get_active_tscm_baseline
+ from utils.database import get_active_tscm_baseline, get_tscm_baseline
HAS_BASELINE_DB = True
except ImportError:
HAS_BASELINE_DB = False
@@ -143,7 +143,7 @@ class AgentConfig:
# Modes section
if parser.has_section('modes'):
- for mode in self.modes_enabled.keys():
+ for mode in self.modes_enabled:
if parser.has_option('modes', mode):
self.modes_enabled[mode] = parser.getboolean('modes', mode)
@@ -310,10 +310,8 @@ class ControllerPushClient(threading.Thread):
except Exception as e:
item['attempts'] += 1
if item['attempts'] < 3 and not self.stop_event.is_set():
- try:
+ with contextlib.suppress(queue.Full):
self.queue.put_nowait(item)
- except queue.Full:
- pass
else:
logger.warning(f"Failed to push after {item['attempts']} attempts: {e}")
finally:
@@ -795,9 +793,7 @@ class ModeManager:
info['vessel_count'] = len(getattr(self, 'ais_vessels', {}))
elif mode == 'aprs':
info['station_count'] = len(getattr(self, 'aprs_stations', {}))
- elif mode == 'pager':
- info['message_count'] = len(self.data_snapshots.get(mode, []))
- elif mode == 'acars':
+ elif mode == 'pager' or mode == 'acars':
info['message_count'] = len(self.data_snapshots.get(mode, []))
elif mode == 'rtlamr':
info['reading_count'] = len(self.data_snapshots.get(mode, []))
@@ -1073,10 +1069,8 @@ class ModeManager:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
except (OSError, ProcessLookupError) as e:
# Process already dead or inaccessible
logger.debug(f"Process cleanup for {mode}: {e}")
@@ -1297,10 +1291,8 @@ class ModeManager:
except Exception as e:
logger.error(f"Sensor output reader error: {e}")
finally:
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
logger.info("Sensor output reader stopped")
# -------------------------------------------------------------------------
@@ -1661,16 +1653,14 @@ class ModeManager:
try:
from utils.validation import validate_network_interface
interface = validate_network_interface(interface)
- except (ImportError, ValueError) as e:
+ except (ImportError, ValueError):
if not os.path.exists(f'/sys/class/net/{interface}'):
return {'status': 'error', 'message': f'Interface {interface} not found'}
csv_path = '/tmp/intercept_agent_wifi'
for f in [f'{csv_path}-01.csv', f'{csv_path}-01.cap', f'{csv_path}-01.gps']:
- try:
+ with contextlib.suppress(OSError):
os.remove(f)
- except OSError:
- pass
airodump_path = self._get_tool_path('airodump-ng')
if not airodump_path:
@@ -1931,7 +1921,7 @@ class ModeManager:
logger.warning("Intercept WiFi parser not available, using fallback")
# Fallback: simple parsing if running standalone
try:
- with open(csv_path, 'r', errors='replace') as f:
+ with open(csv_path, errors='replace') as f:
content = f.read()
for section in content.split('\n\n'):
lines = section.strip().split('\n')
@@ -2303,10 +2293,8 @@ class ModeManager:
except Exception as e:
logger.error(f"Pager reader error: {e}")
finally:
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
if 'pager_rtl' in self.processes:
try:
rtl_proc = self.processes['pager_rtl']
@@ -2491,7 +2479,7 @@ class ModeManager:
sock.close()
- except Exception as e:
+ except Exception:
retry_count += 1
if retry_count >= 10:
logger.error("Max AIS retries reached")
@@ -2701,10 +2689,8 @@ class ModeManager:
except Exception as e:
logger.error(f"ACARS reader error: {e}")
finally:
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
logger.info("ACARS reader stopped")
# -------------------------------------------------------------------------
@@ -2846,10 +2832,8 @@ class ModeManager:
except Exception as e:
logger.error(f"APRS reader error: {e}")
finally:
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
if 'aprs_rtl' in self.processes:
try:
rtl_proc = self.processes['aprs_rtl']
@@ -3021,10 +3005,8 @@ class ModeManager:
except Exception as e:
logger.error(f"RTLAMR reader error: {e}")
finally:
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
if 'rtlamr_tcp' in self.processes:
try:
tcp_proc = self.processes['rtlamr_tcp']
@@ -3142,10 +3124,8 @@ class ModeManager:
except Exception as e:
logger.error(f"DSC reader error: {e}")
finally:
- try:
+ with contextlib.suppress(Exception):
proc.wait(timeout=1)
- except Exception:
- pass
logger.info("DSC reader stopped")
# -------------------------------------------------------------------------
@@ -3219,13 +3199,13 @@ class ModeManager:
stop_event = self.stop_events.get(mode)
# Import existing Intercept TSCM functions
- from routes.tscm import _scan_wifi_networks, _scan_wifi_clients, _scan_bluetooth_devices, _scan_rf_signals
+ from routes.tscm import _scan_bluetooth_devices, _scan_rf_signals, _scan_wifi_clients, _scan_wifi_networks
logger.info("TSCM imports successful")
sweep_ranges = None
if sweep_type:
try:
- from data.tscm_frequencies import get_sweep_preset, SWEEP_PRESETS
+ from data.tscm_frequencies import SWEEP_PRESETS, get_sweep_preset
preset = get_sweep_preset(sweep_type) or SWEEP_PRESETS.get('standard')
sweep_ranges = preset.get('ranges') if preset else None
except Exception:
@@ -3412,7 +3392,8 @@ class ModeManager:
if scan_rf and (current_time - last_rf_scan) >= rf_scan_interval:
try:
# Pass a stop check that uses our stop_event (not the module's _sweep_running)
- agent_stop_check = lambda: stop_event and stop_event.is_set()
+ def agent_stop_check():
+ return stop_event and stop_event.is_set()
rf_signals = _scan_rf_signals(
sdr_device,
stop_check=agent_stop_check,
@@ -3610,10 +3591,8 @@ class ModeManager:
# Ensure test process is killed on any error
if test_proc and test_proc.poll() is None:
test_proc.kill()
- try:
+ with contextlib.suppress(Exception):
test_proc.wait(timeout=1)
- except Exception:
- pass
return {'status': 'error', 'message': f'SDR check failed: {str(e)}'}
# Initialize state
@@ -3647,9 +3626,9 @@ class ModeManager:
step: float, modulation: str, squelch: int,
device: str, gain: str, dwell_time: float = 1.0):
"""Scan frequency range and report signal detections."""
- import select
- import os
import fcntl
+ import os
+ import select
mode = 'listening_post'
stop_event = self.stop_events.get(mode)
@@ -3709,7 +3688,7 @@ class ModeManager:
signal_detected = True
except Exception:
pass
- except (IOError, BlockingIOError):
+ except (OSError, BlockingIOError):
pass
proc.terminate()
@@ -4131,27 +4110,19 @@ def main():
# Stop push services
if data_push_loop:
- try:
+ with contextlib.suppress(Exception):
data_push_loop.stop()
- except Exception:
- pass
if push_client:
- try:
+ with contextlib.suppress(Exception):
push_client.stop()
- except Exception:
- pass
# Stop GPS
- try:
+ with contextlib.suppress(Exception):
gps_manager.stop()
- except Exception:
- pass
# Shutdown HTTP server
- try:
+ with contextlib.suppress(Exception):
httpd.shutdown()
- except Exception:
- pass
# Run cleanup in background thread so signal handler returns quickly
cleanup_thread = threading.Thread(target=cleanup, daemon=True)
diff --git a/pyproject.toml b/pyproject.toml
index 3494700..dbbe184 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "intercept"
-version = "2.24.0"
+version = "2.26.0"
description = "Signal Intelligence Platform - Pager/433MHz/ADS-B/Satellite/WiFi/Bluetooth"
readme = "README.md"
requires-python = ">=3.9"
@@ -93,8 +93,32 @@ ignore = [
"B008", # do not perform function calls in argument defaults
"B905", # zip without explicit strict
"SIM108", # use ternary operator instead of if-else
+ "SIM102", # collapsible if statements
+ "SIM105", # use contextlib.suppress (stylistic, not a bug)
+ "SIM115", # use context manager for open (not always applicable)
+ "SIM116", # use dict instead of if/elif chain (stylistic)
+ "SIM117", # combine nested with statements (stylistic)
+ "E402", # module-level import not at top (needed for conditional imports)
+ "E741", # ambiguous variable name
+ "E721", # type comparison (use isinstance)
+ "E722", # bare except
+ "B904", # raise from within except (stylistic)
+ "B007", # unused loop variable (use _ prefix)
+ "B023", # function definition doesn't bind loop variable
+ "F601", # membership test with duplicate items
+ "F821", # undefined name (too many false positives with conditional imports)
+ "UP035", # deprecated typing imports
]
+[tool.ruff.lint.per-file-ignores]
+"__init__.py" = ["F401"] # re-exports in __init__.py are intentional
+"utils/bluetooth/capability_check.py" = ["F401"] # imports used for availability checking
+"utils/bluetooth/fallback_scanner.py" = ["F401"] # imports used for availability checking
+"utils/tscm/ble_scanner.py" = ["F401"] # imports used for availability checking
+"utils/wifi/deauth_detector.py" = ["F401"] # imports used for availability checking
+"routes/dsc.py" = ["F401"] # imports used for availability checking
+"intercept_agent.py" = ["F401"] # conditional imports
+
[tool.ruff.lint.isort]
known-first-party = ["app", "config", "routes", "utils", "data"]
diff --git a/routes/__init__.py b/routes/__init__.py
index 868d3d5..7007cc2 100644
--- a/routes/__init__.py
+++ b/routes/__init__.py
@@ -24,8 +24,8 @@ def register_blueprints(app):
from .meshtastic import meshtastic_bp
from .meteor_websocket import meteor_bp
from .morse import morse_bp
- from .ook import ook_bp
from .offline import offline_bp
+ from .ook import ook_bp
from .pager import pager_bp
from .radiosonde import radiosonde_bp
from .recordings import recordings_bp
@@ -44,8 +44,8 @@ def register_blueprints(app):
from .updater import updater_bp
from .vdl2 import vdl2_bp
from .weather_sat import weather_sat_bp
- from .wefax import wefax_bp
from .websdr import websdr_bp
+ from .wefax import wefax_bp
from .wifi import wifi_bp
from .wifi_v2 import wifi_v2_bp
diff --git a/routes/acars.py b/routes/acars.py
index 8612d26..03d9039 100644
--- a/routes/acars.py
+++ b/routes/acars.py
@@ -2,7 +2,7 @@
from __future__ import annotations
-import io
+import contextlib
import json
import os
import platform
@@ -13,11 +13,10 @@ import subprocess
import threading
import time
from datetime import datetime
-from typing import Any, Generator
+from typing import Any
from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
from utils.acars_translator import translate_message
from utils.constants import (
@@ -30,6 +29,7 @@ from utils.event_pipeline import process_event
from utils.flight_correlator import get_flight_correlator
from utils.logging import sensor_logger as logger
from utils.process import register_process, unregister_process
+from utils.responses import api_error
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.validation import validate_device_index, validate_gain, validate_ppm
@@ -143,10 +143,8 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) -
app_module.acars_queue.put(data)
# Feed flight correlator
- try:
+ with contextlib.suppress(Exception):
get_flight_correlator().add_acars_message(data)
- except Exception:
- pass
# Log if enabled
if app_module.logging_enabled:
@@ -172,10 +170,8 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) -
process.terminate()
process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
process.kill()
- except Exception:
- pass
unregister_process(process)
app_module.acars_queue.put({'type': 'status', 'status': 'stopped'})
with app_module.acars_lock:
@@ -335,7 +331,7 @@ def start_acars() -> Response:
)
os.close(slave_fd)
# Wrap master_fd as a text file for line-buffered reading
- process.stdout = io.open(master_fd, 'r', buffering=1)
+ process.stdout = open(master_fd, buffering=1)
is_text_mode = True
else:
process = subprocess.Popen(
diff --git a/routes/adsb.py b/routes/adsb.py
index e1ea52f..3ca852c 100644
--- a/routes/adsb.py
+++ b/routes/adsb.py
@@ -2,9 +2,9 @@
from __future__ import annotations
-import json
import csv
import io
+import json
import os
import queue
import shutil
@@ -13,11 +13,11 @@ import subprocess
import threading
import time
from datetime import datetime, timedelta, timezone
-from typing import Any, Generator
+from typing import Any
from flask import Blueprint, Response, jsonify, make_response, render_template, request
-from utils.responses import api_success, api_error
+from utils.responses import api_error, api_success
# psycopg2 is optional - only needed for PostgreSQL history persistence
try:
@@ -29,6 +29,8 @@ except ImportError:
RealDictCursor = None # type: ignore
PSYCOPG2_AVAILABLE = False
+import contextlib
+
import app as app_module
from config import (
ADSB_AUTO_START,
@@ -406,18 +408,17 @@ def _get_active_session() -> dict[str, Any] | None:
return None
_ensure_history_schema()
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(
- """
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(
+ """
SELECT *
FROM adsb_sessions
WHERE ended_at IS NULL
ORDER BY started_at DESC
LIMIT 1
"""
- )
- return cur.fetchone()
+ )
+ return cur.fetchone()
except Exception as exc:
logger.warning("ADS-B session lookup failed: %s", exc)
return None
@@ -436,10 +437,9 @@ def _record_session_start(
return None
_ensure_history_schema()
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(
- """
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(
+ """
INSERT INTO adsb_sessions (
device_index,
sdr_type,
@@ -451,16 +451,16 @@ def _record_session_start(
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING *
""",
- (
- device_index,
- sdr_type,
- remote_host,
- remote_port,
- start_source,
- started_by,
- ),
- )
- return cur.fetchone()
+ (
+ device_index,
+ sdr_type,
+ remote_host,
+ remote_port,
+ start_source,
+ started_by,
+ ),
+ )
+ return cur.fetchone()
except Exception as exc:
logger.warning("ADS-B session start record failed: %s", exc)
return None
@@ -471,10 +471,9 @@ def _record_session_stop(*, stop_source: str | None, stopped_by: str | None) ->
return None
_ensure_history_schema()
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(
- """
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(
+ """
UPDATE adsb_sessions
SET ended_at = NOW(),
stop_source = COALESCE(%s, stop_source),
@@ -482,9 +481,9 @@ def _record_session_stop(*, stop_source: str | None, stopped_by: str | None) ->
WHERE ended_at IS NULL
RETURNING *
""",
- (stop_source, stopped_by),
- )
- return cur.fetchone()
+ (stop_source, stopped_by),
+ )
+ return cur.fetchone()
except Exception as exc:
logger.warning("ADS-B session stop record failed: %s", exc)
return None
@@ -665,10 +664,8 @@ def parse_sbs_stream(service_addr):
elif msg_type == '3' and len(parts) > 15:
if parts[11]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['altitude'] = int(float(parts[11]))
- except (ValueError, TypeError):
- pass
if parts[14] and parts[15]:
try:
aircraft['lat'] = float(parts[14])
@@ -678,15 +675,11 @@ def parse_sbs_stream(service_addr):
elif msg_type == '4' and len(parts) > 16:
if parts[12]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['speed'] = int(float(parts[12]))
- except (ValueError, TypeError):
- pass
if parts[13]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['heading'] = int(float(parts[13]))
- except (ValueError, TypeError):
- pass
if parts[16]:
try:
aircraft['vertical_rate'] = int(float(parts[16]))
@@ -705,10 +698,8 @@ def parse_sbs_stream(service_addr):
if callsign:
aircraft['callsign'] = callsign
if parts[11]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['altitude'] = int(float(parts[11]))
- except (ValueError, TypeError):
- pass
elif msg_type == '6' and len(parts) > 17:
if parts[17]:
@@ -724,20 +715,14 @@ def parse_sbs_stream(service_addr):
elif msg_type == '2' and len(parts) > 15:
if parts[11]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['altitude'] = int(float(parts[11]))
- except (ValueError, TypeError):
- pass
if parts[12]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['speed'] = int(float(parts[12]))
- except (ValueError, TypeError):
- pass
if parts[13]:
- try:
+ with contextlib.suppress(ValueError, TypeError):
aircraft['heading'] = int(float(parts[13]))
- except (ValueError, TypeError):
- pass
if parts[14] and parts[15]:
try:
aircraft['lat'] = float(parts[14])
@@ -765,10 +750,8 @@ def parse_sbs_stream(service_addr):
time.sleep(SBS_RECONNECT_DELAY)
finally:
if sock:
- try:
+ with contextlib.suppress(OSError):
sock.close()
- except OSError:
- pass
adsb_connected = False
logger.info("SBS stream parser stopped")
@@ -1019,10 +1002,8 @@ def start_adsb():
adsb_active_sdr_type = None
stderr_output = ''
if app_module.adsb_process.stderr:
- try:
+ with contextlib.suppress(Exception):
stderr_output = app_module.adsb_process.stderr.read().decode('utf-8', errors='ignore').strip()
- except Exception:
- pass
# Parse stderr to provide specific guidance
error_type = 'START_FAILED'
@@ -1190,10 +1171,8 @@ def stream_adsb():
try:
msg = client_queue.get(timeout=SSE_QUEUE_TIMEOUT)
last_keepalive = time.time()
- try:
+ with contextlib.suppress(Exception):
process_event('adsb', msg, msg.get('type'))
- except Exception:
- pass
yield format_sse(msg)
except queue.Empty:
now = time.time()
@@ -1251,10 +1230,9 @@ def adsb_history_summary():
"""
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(sql, (window, window, window, window, window))
- row = cur.fetchone() or {}
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(sql, (window, window, window, window, window))
+ row = cur.fetchone() or {}
return jsonify(row)
except Exception as exc:
logger.warning("ADS-B history summary failed: %s", exc)
@@ -1301,10 +1279,9 @@ def adsb_history_aircraft():
"""
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(sql, (window, search, pattern, pattern, pattern, limit))
- rows = cur.fetchall()
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(sql, (window, search, pattern, pattern, pattern, limit))
+ rows = cur.fetchall()
return jsonify({'aircraft': rows, 'count': len(rows)})
except Exception as exc:
logger.warning("ADS-B history aircraft query failed: %s", exc)
@@ -1336,10 +1313,9 @@ def adsb_history_timeline():
"""
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(sql, (icao, window, limit))
- rows = cur.fetchall()
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(sql, (icao, window, limit))
+ rows = cur.fetchall()
return jsonify({'icao': icao, 'timeline': rows, 'count': len(rows)})
except Exception as exc:
logger.warning("ADS-B history timeline query failed: %s", exc)
@@ -1368,10 +1344,9 @@ def adsb_history_messages():
"""
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- cur.execute(sql, (window, icao, icao, limit))
- rows = cur.fetchall()
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ cur.execute(sql, (window, icao, icao, limit))
+ rows = cur.fetchall()
return jsonify({'icao': icao, 'messages': rows, 'count': len(rows)})
except Exception as exc:
logger.warning("ADS-B history message query failed: %s", exc)
@@ -1418,89 +1393,88 @@ def adsb_history_export():
]
try:
- with _get_history_connection() as conn:
- with conn.cursor(cursor_factory=RealDictCursor) as cur:
- if export_type in {'snapshots', 'all'}:
- snapshot_where: list[str] = []
- snapshot_params: list[Any] = []
- _add_time_filter(
- where_parts=snapshot_where,
- params=snapshot_params,
- scope=scope,
- timestamp_field='captured_at',
- since_minutes=since_minutes,
- start=start,
- end=end,
- )
- if icao:
- snapshot_where.append("icao = %s")
- snapshot_params.append(icao)
- if search:
- snapshot_where.append("(icao ILIKE %s OR callsign ILIKE %s OR registration ILIKE %s)")
- snapshot_params.extend([pattern, pattern, pattern])
+ with _get_history_connection() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
+ if export_type in {'snapshots', 'all'}:
+ snapshot_where: list[str] = []
+ snapshot_params: list[Any] = []
+ _add_time_filter(
+ where_parts=snapshot_where,
+ params=snapshot_params,
+ scope=scope,
+ timestamp_field='captured_at',
+ since_minutes=since_minutes,
+ start=start,
+ end=end,
+ )
+ if icao:
+ snapshot_where.append("icao = %s")
+ snapshot_params.append(icao)
+ if search:
+ snapshot_where.append("(icao ILIKE %s OR callsign ILIKE %s OR registration ILIKE %s)")
+ snapshot_params.extend([pattern, pattern, pattern])
- snapshot_sql = """
+ snapshot_sql = """
SELECT captured_at, icao, callsign, registration, type_code, type_desc,
altitude, speed, heading, vertical_rate, lat, lon, squawk, source_host
FROM adsb_snapshots
"""
- if snapshot_where:
- snapshot_sql += " WHERE " + " AND ".join(snapshot_where)
- snapshot_sql += " ORDER BY captured_at DESC"
- cur.execute(snapshot_sql, tuple(snapshot_params))
- snapshots = _filter_by_classification(cur.fetchall())
+ if snapshot_where:
+ snapshot_sql += " WHERE " + " AND ".join(snapshot_where)
+ snapshot_sql += " ORDER BY captured_at DESC"
+ cur.execute(snapshot_sql, tuple(snapshot_params))
+ snapshots = _filter_by_classification(cur.fetchall())
- if export_type in {'messages', 'all'}:
- message_where: list[str] = []
- message_params: list[Any] = []
- _add_time_filter(
- where_parts=message_where,
- params=message_params,
- scope=scope,
- timestamp_field='received_at',
- since_minutes=since_minutes,
- start=start,
- end=end,
- )
- if icao:
- message_where.append("icao = %s")
- message_params.append(icao)
- if search:
- message_where.append("(icao ILIKE %s OR callsign ILIKE %s)")
- message_params.extend([pattern, pattern])
+ if export_type in {'messages', 'all'}:
+ message_where: list[str] = []
+ message_params: list[Any] = []
+ _add_time_filter(
+ where_parts=message_where,
+ params=message_params,
+ scope=scope,
+ timestamp_field='received_at',
+ since_minutes=since_minutes,
+ start=start,
+ end=end,
+ )
+ if icao:
+ message_where.append("icao = %s")
+ message_params.append(icao)
+ if search:
+ message_where.append("(icao ILIKE %s OR callsign ILIKE %s)")
+ message_params.extend([pattern, pattern])
- message_sql = """
+ message_sql = """
SELECT received_at, msg_time, logged_time, icao, msg_type, callsign,
altitude, speed, heading, vertical_rate, lat, lon, squawk,
session_id, aircraft_id, flight_id, source_host, raw_line
FROM adsb_messages
"""
- if message_where:
- message_sql += " WHERE " + " AND ".join(message_where)
- message_sql += " ORDER BY received_at DESC"
- cur.execute(message_sql, tuple(message_params))
- messages = _filter_by_classification(cur.fetchall())
+ if message_where:
+ message_sql += " WHERE " + " AND ".join(message_where)
+ message_sql += " ORDER BY received_at DESC"
+ cur.execute(message_sql, tuple(message_params))
+ messages = _filter_by_classification(cur.fetchall())
- if export_type in {'sessions', 'all'}:
- session_where: list[str] = []
- session_params: list[Any] = []
- if scope == 'custom' and start is not None and end is not None:
- session_where.append("COALESCE(ended_at, %s) >= %s AND started_at < %s")
- session_params.extend([end, start, end])
- elif scope == 'window':
- session_where.append("COALESCE(ended_at, NOW()) >= NOW() - INTERVAL %s")
- session_params.append(f'{since_minutes} minutes')
+ if export_type in {'sessions', 'all'}:
+ session_where: list[str] = []
+ session_params: list[Any] = []
+ if scope == 'custom' and start is not None and end is not None:
+ session_where.append("COALESCE(ended_at, %s) >= %s AND started_at < %s")
+ session_params.extend([end, start, end])
+ elif scope == 'window':
+ session_where.append("COALESCE(ended_at, NOW()) >= NOW() - INTERVAL %s")
+ session_params.append(f'{since_minutes} minutes')
- session_sql = """
+ session_sql = """
SELECT id, started_at, ended_at, device_index, sdr_type, remote_host,
remote_port, start_source, stop_source, started_by, stopped_by, notes
FROM adsb_sessions
"""
- if session_where:
- session_sql += " WHERE " + " AND ".join(session_where)
- session_sql += " ORDER BY started_at DESC"
- cur.execute(session_sql, tuple(session_params))
- sessions = cur.fetchall()
+ if session_where:
+ session_sql += " WHERE " + " AND ".join(session_where)
+ session_sql += " ORDER BY started_at DESC"
+ cur.execute(session_sql, tuple(session_params))
+ sessions = cur.fetchall()
except Exception as exc:
logger.warning("ADS-B history export failed: %s", exc)
return api_error('History database unavailable', 503)
@@ -1570,59 +1544,58 @@ def adsb_history_prune():
return api_error('mode must be range or all', 400)
try:
- with _get_history_connection() as conn:
- with conn.cursor() as cur:
- deleted = {'messages': 0, 'snapshots': 0}
+ with _get_history_connection() as conn, conn.cursor() as cur:
+ deleted = {'messages': 0, 'snapshots': 0}
- if mode == 'all':
- cur.execute("DELETE FROM adsb_messages")
- deleted['messages'] = max(0, cur.rowcount or 0)
- cur.execute("DELETE FROM adsb_snapshots")
- deleted['snapshots'] = max(0, cur.rowcount or 0)
- return jsonify({
- 'status': 'ok',
- 'mode': 'all',
- 'deleted': deleted,
- 'total_deleted': deleted['messages'] + deleted['snapshots'],
- })
+ if mode == 'all':
+ cur.execute("DELETE FROM adsb_messages")
+ deleted['messages'] = max(0, cur.rowcount or 0)
+ cur.execute("DELETE FROM adsb_snapshots")
+ deleted['snapshots'] = max(0, cur.rowcount or 0)
+ return jsonify({
+ 'status': 'ok',
+ 'mode': 'all',
+ 'deleted': deleted,
+ 'total_deleted': deleted['messages'] + deleted['snapshots'],
+ })
- start = _parse_iso_datetime(payload.get('start'))
- end = _parse_iso_datetime(payload.get('end'))
- if start is None or end is None:
- return api_error('start and end ISO datetime values are required', 400)
- if end <= start:
- return api_error('end must be after start', 400)
- if end - start > timedelta(days=31):
- return api_error('range cannot exceed 31 days', 400)
+ start = _parse_iso_datetime(payload.get('start'))
+ end = _parse_iso_datetime(payload.get('end'))
+ if start is None or end is None:
+ return api_error('start and end ISO datetime values are required', 400)
+ if end <= start:
+ return api_error('end must be after start', 400)
+ if end - start > timedelta(days=31):
+ return api_error('range cannot exceed 31 days', 400)
- cur.execute(
- """
+ cur.execute(
+ """
DELETE FROM adsb_messages
WHERE received_at >= %s
AND received_at < %s
""",
- (start, end),
- )
- deleted['messages'] = max(0, cur.rowcount or 0)
+ (start, end),
+ )
+ deleted['messages'] = max(0, cur.rowcount or 0)
- cur.execute(
- """
+ cur.execute(
+ """
DELETE FROM adsb_snapshots
WHERE captured_at >= %s
AND captured_at < %s
""",
- (start, end),
- )
- deleted['snapshots'] = max(0, cur.rowcount or 0)
+ (start, end),
+ )
+ deleted['snapshots'] = max(0, cur.rowcount or 0)
- return jsonify({
- 'status': 'ok',
- 'mode': 'range',
- 'start': start.isoformat(),
- 'end': end.isoformat(),
- 'deleted': deleted,
- 'total_deleted': deleted['messages'] + deleted['snapshots'],
- })
+ return jsonify({
+ 'status': 'ok',
+ 'mode': 'range',
+ 'start': start.isoformat(),
+ 'end': end.isoformat(),
+ 'deleted': deleted,
+ 'total_deleted': deleted['messages'] + deleted['snapshots'],
+ })
except Exception as exc:
logger.warning("ADS-B history prune failed: %s", exc)
return api_error('History database unavailable', 503)
diff --git a/routes/ais.py b/routes/ais.py
index 1a9f3ca..7ff016c 100644
--- a/routes/ais.py
+++ b/routes/ais.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import contextlib
import json
import os
import queue
@@ -10,30 +11,28 @@ import socket
import subprocess
import threading
import time
-from typing import Generator
-from flask import Blueprint, jsonify, request, Response, render_template
+from flask import Blueprint, Response, jsonify, render_template, request
-from utils.responses import api_success, api_error
import app as app_module
from config import SHARED_OBSERVER_LOCATION_ENABLED
-from utils.logging import get_logger
-from utils.validation import validate_device_index, validate_gain
-from utils.sse import sse_stream_fanout
-from utils.event_pipeline import process_event
-from utils.sdr import SDRFactory, SDRType
from utils.constants import (
+ AIS_RECONNECT_DELAY,
+ AIS_SOCKET_TIMEOUT,
AIS_TCP_PORT,
AIS_TERMINATE_TIMEOUT,
- AIS_SOCKET_TIMEOUT,
- AIS_RECONNECT_DELAY,
AIS_UPDATE_INTERVAL,
+ PROCESS_TERMINATE_TIMEOUT,
SOCKET_BUFFER_SIZE,
SSE_KEEPALIVE_INTERVAL,
SSE_QUEUE_TIMEOUT,
- SOCKET_CONNECT_TIMEOUT,
- PROCESS_TERMINATE_TIMEOUT,
)
+from utils.event_pipeline import process_event
+from utils.logging import get_logger
+from utils.responses import api_error, api_success
+from utils.sdr import SDRFactory, SDRType
+from utils.sse import sse_stream_fanout
+from utils.validation import validate_device_index, validate_gain
logger = get_logger('intercept.ais')
@@ -128,13 +127,11 @@ def parse_ais_stream(port: int):
for mmsi in pending_updates:
if mmsi in app_module.ais_vessels:
_vessel_snap = app_module.ais_vessels[mmsi]
- try:
+ with contextlib.suppress(queue.Full):
app_module.ais_queue.put_nowait({
'type': 'vessel',
**_vessel_snap
})
- except queue.Full:
- pass
# Geofence check
_v_lat = _vessel_snap.get('lat')
_v_lon = _vessel_snap.get('lon')
@@ -163,10 +160,8 @@ def parse_ais_stream(port: int):
time.sleep(AIS_RECONNECT_DELAY)
finally:
if sock:
- try:
+ with contextlib.suppress(OSError):
sock.close()
- except OSError:
- pass
ais_connected = False
logger.info("AIS stream parser stopped")
@@ -440,10 +435,8 @@ def start_ais():
app_module.release_sdr_device(device_int, sdr_type_str)
stderr_output = ''
if app_module.ais_process.stderr:
- try:
+ with contextlib.suppress(Exception):
stderr_output = app_module.ais_process.stderr.read().decode('utf-8', errors='ignore').strip()
- except Exception:
- pass
if stderr_output:
logger.error(f"AIS-catcher stderr:\n{stderr_output}")
error_msg = 'AIS-catcher failed to start. Check SDR device connection.'
@@ -533,7 +526,7 @@ def get_vessel_dsc(mmsi: str):
matches = []
try:
- for key, msg in app_module.dsc_messages.items():
+ for _key, msg in app_module.dsc_messages.items():
if str(msg.get('source_mmsi', '')) == mmsi:
matches.append(dict(msg))
except Exception:
diff --git a/routes/alerts.py b/routes/alerts.py
index a1d414b..82b8c8f 100644
--- a/routes/alerts.py
+++ b/routes/alerts.py
@@ -2,14 +2,12 @@
from __future__ import annotations
-import queue
-import time
-from typing import Generator
+from collections.abc import Generator
-from flask import Blueprint, Response, jsonify, request
+from flask import Blueprint, Response, request
from utils.alerts import get_alert_manager
-from utils.responses import api_success, api_error
+from utils.responses import api_error, api_success
from utils.sse import format_sse
alerts_bp = Blueprint('alerts', __name__, url_prefix='/alerts')
diff --git a/routes/aprs.py b/routes/aprs.py
index 09cc811..d523386 100644
--- a/routes/aprs.py
+++ b/routes/aprs.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import contextlib
import csv
import json
import os
@@ -15,14 +16,23 @@ import tempfile
import threading
import time
from datetime import datetime
-from subprocess import PIPE, STDOUT
-from typing import Any, Generator, Optional
+from subprocess import PIPE
+from typing import Any
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
+from utils.constants import (
+ PROCESS_START_WAIT,
+ PROCESS_TERMINATE_TIMEOUT,
+ SSE_KEEPALIVE_INTERVAL,
+ SSE_QUEUE_TIMEOUT,
+)
+from utils.event_pipeline import process_event
from utils.logging import sensor_logger as logger
+from utils.responses import api_error, api_success
+from utils.sdr import SDRFactory, SDRType
+from utils.sse import sse_stream_fanout
from utils.validation import (
validate_device_index,
validate_gain,
@@ -30,15 +40,6 @@ from utils.validation import (
validate_rtl_tcp_host,
validate_rtl_tcp_port,
)
-from utils.sse import sse_stream_fanout
-from utils.event_pipeline import process_event
-from utils.sdr import SDRFactory, SDRType
-from utils.constants import (
- PROCESS_TERMINATE_TIMEOUT,
- SSE_KEEPALIVE_INTERVAL,
- SSE_QUEUE_TIMEOUT,
- PROCESS_START_WAIT,
-)
aprs_bp = Blueprint('aprs', __name__, url_prefix='/aprs')
@@ -75,27 +76,27 @@ METER_MIN_INTERVAL = 0.1 # Max 10 updates/sec
METER_MIN_CHANGE = 2 # Only send if level changes by at least this much
-def find_direwolf() -> Optional[str]:
+def find_direwolf() -> str | None:
"""Find direwolf binary."""
return shutil.which('direwolf')
-def find_multimon_ng() -> Optional[str]:
+def find_multimon_ng() -> str | None:
"""Find multimon-ng binary."""
return shutil.which('multimon-ng')
-def find_rtl_fm() -> Optional[str]:
+def find_rtl_fm() -> str | None:
"""Find rtl_fm binary."""
return shutil.which('rtl_fm')
-def find_rx_fm() -> Optional[str]:
+def find_rx_fm() -> str | None:
"""Find SoapySDR rx_fm binary."""
return shutil.which('rx_fm')
-def find_rtl_power() -> Optional[str]:
+def find_rtl_power() -> str | None:
"""Find rtl_power binary for spectrum scanning."""
return shutil.which('rtl_power')
@@ -142,7 +143,7 @@ def normalize_aprs_output_line(line: str) -> str:
return normalized
-def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
+def parse_aprs_packet(raw_packet: str) -> dict | None:
"""Parse APRS packet into structured data.
Supports all major APRS packet types:
@@ -431,7 +432,7 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
return None
-def parse_position(data: str) -> Optional[dict]:
+def parse_position(data: str) -> dict | None:
"""Parse APRS position data."""
try:
# Format: DDMM.mmN/DDDMM.mmW (or similar with symbols)
@@ -591,7 +592,7 @@ def parse_position(data: str) -> Optional[dict]:
return None
-def parse_object(data: str) -> Optional[dict]:
+def parse_object(data: str) -> dict | None:
"""Parse APRS object data.
Object format: ;OBJECTNAME*DDHHMMzPOSITION or ;OBJECTNAME_DDHHMMzPOSITION
@@ -649,7 +650,7 @@ def parse_object(data: str) -> Optional[dict]:
return None
-def parse_item(data: str) -> Optional[dict]:
+def parse_item(data: str) -> dict | None:
"""Parse APRS item data.
Item format: )ITEMNAME!POSITION or )ITEMNAME_POSITION
@@ -830,7 +831,7 @@ MIC_E_MESSAGE_TYPES = {
}
-def parse_mic_e(dest: str, data: str) -> Optional[dict]:
+def parse_mic_e(dest: str, data: str) -> dict | None:
"""Parse Mic-E encoded position from destination and data fields.
Mic-E is a highly compressed format that encodes:
@@ -973,7 +974,7 @@ def parse_mic_e(dest: str, data: str) -> Optional[dict]:
return None
-def parse_compressed_position(data: str) -> Optional[dict]:
+def parse_compressed_position(data: str) -> dict | None:
r"""Parse compressed position format (Base-91 encoding).
Compressed format: /YYYYXXXX$csT
@@ -1057,7 +1058,7 @@ def parse_compressed_position(data: str) -> Optional[dict]:
return None
-def parse_telemetry(data: str) -> Optional[dict]:
+def parse_telemetry(data: str) -> dict | None:
"""Parse APRS telemetry data.
Format: T#sss,aaa,aaa,aaa,aaa,aaa,bbbbbbbb
@@ -1122,7 +1123,7 @@ def parse_telemetry(data: str) -> Optional[dict]:
return None
-def parse_telemetry_definition(callsign: str, msg_type: str, content: str) -> Optional[dict]:
+def parse_telemetry_definition(callsign: str, msg_type: str, content: str) -> dict | None:
"""Parse telemetry definition messages (PARM, UNIT, EQNS, BITS).
These messages define the meaning of telemetry values for a station.
@@ -1174,7 +1175,7 @@ def parse_telemetry_definition(callsign: str, msg_type: str, content: str) -> Op
return None
-def parse_phg(data: str) -> Optional[dict]:
+def parse_phg(data: str) -> dict | None:
"""Parse PHG (Power/Height/Gain/Directivity) data.
Format: PHGphgd
@@ -1217,7 +1218,7 @@ def parse_phg(data: str) -> Optional[dict]:
return None
-def parse_rng(data: str) -> Optional[dict]:
+def parse_rng(data: str) -> dict | None:
"""Parse RNG (radio range) data.
Format: RNGrrrr where rrrr is range in miles.
@@ -1231,7 +1232,7 @@ def parse_rng(data: str) -> Optional[dict]:
return None
-def parse_df_report(data: str) -> Optional[dict]:
+def parse_df_report(data: str) -> dict | None:
"""Parse Direction Finding (DF) report.
Format: CSE/SPD/BRG/NRQ or similar patterns.
@@ -1260,7 +1261,7 @@ def parse_df_report(data: str) -> Optional[dict]:
return None
-def parse_timestamp(data: str) -> Optional[dict]:
+def parse_timestamp(data: str) -> dict | None:
"""Parse APRS timestamp from position data.
Formats:
@@ -1304,7 +1305,7 @@ def parse_timestamp(data: str) -> Optional[dict]:
return None
-def parse_third_party(data: str) -> Optional[dict]:
+def parse_third_party(data: str) -> dict | None:
"""Parse third-party traffic (packets relayed from another network).
Format: }CALL>PATH:DATA (the } indicates third-party)
@@ -1330,7 +1331,7 @@ def parse_third_party(data: str) -> Optional[dict]:
return None
-def parse_user_defined(data: str) -> Optional[dict]:
+def parse_user_defined(data: str) -> dict | None:
"""Parse user-defined data format.
Format: {UUXXXX...
@@ -1352,7 +1353,7 @@ def parse_user_defined(data: str) -> Optional[dict]:
return None
-def parse_capabilities(data: str) -> Optional[dict]:
+def parse_capabilities(data: str) -> dict | None:
"""Parse station capabilities response.
Format: Optional[dict]:
return None
-def parse_nmea(data: str) -> Optional[dict]:
+def parse_nmea(data: str) -> dict | None:
"""Parse raw GPS NMEA sentences.
APRS can include raw NMEA data starting with $.
@@ -1409,7 +1410,7 @@ def parse_nmea(data: str) -> Optional[dict]:
return None
-def parse_audio_level(line: str) -> Optional[int]:
+def parse_audio_level(line: str) -> int | None:
"""Parse direwolf audio level line and return normalized level (0-100).
Direwolf outputs lines like:
@@ -1579,10 +1580,8 @@ def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_pr
logger.error(f"APRS stream error: {e}")
app_module.aprs_queue.put({'type': 'error', 'message': str(e)})
finally:
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
app_module.aprs_queue.put({'type': 'status', 'status': 'stopped'})
# Cleanup processes
for proc in [rtl_process, decoder_process]:
@@ -1590,10 +1589,8 @@ def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_pr
proc.terminate()
proc.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
proc.kill()
- except Exception:
- pass
# Release SDR device — only if it's still ours (not reclaimed by a new start)
if my_device is not None and aprs_active_device == my_device:
app_module.release_sdr_device(my_device, aprs_active_sdr_type or 'rtlsdr')
@@ -1860,14 +1857,10 @@ def start_aprs() -> Response:
if stderr_output:
error_msg += f': {stderr_output[:500]}'
logger.error(error_msg)
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
- try:
+ with contextlib.suppress(Exception):
decoder_process.kill()
- except Exception:
- pass
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
@@ -1888,14 +1881,10 @@ def start_aprs() -> Response:
if error_output:
error_msg += f': {error_output}'
logger.error(error_msg)
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
- try:
+ with contextlib.suppress(Exception):
rtl_process.kill()
- except Exception:
- pass
if aprs_active_device is not None:
app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr')
aprs_active_device = None
@@ -1961,10 +1950,8 @@ def stop_aprs() -> Response:
# Close PTY master fd
if hasattr(app_module, 'aprs_master_fd') and app_module.aprs_master_fd is not None:
- try:
+ with contextlib.suppress(OSError):
os.close(app_module.aprs_master_fd)
- except OSError:
- pass
app_module.aprs_master_fd = None
app_module.aprs_process = None
@@ -2099,7 +2086,7 @@ def scan_aprs_spectrum() -> Response:
return api_error('rtl_power did not produce output file', 500)
bins = []
- with open(tmp_file, 'r') as f:
+ with open(tmp_file) as f:
reader = csv.reader(f)
for row in reader:
if len(row) < 7:
diff --git a/routes/audio_websocket.py b/routes/audio_websocket.py
index bbe17cb..9e65214 100644
--- a/routes/audio_websocket.py
+++ b/routes/audio_websocket.py
@@ -6,6 +6,7 @@ import socket
import subprocess
import threading
import time
+
from flask import Flask
# Try to import flask-sock
@@ -16,6 +17,8 @@ except ImportError:
WEBSOCKET_AVAILABLE = False
Sock = None
+import contextlib
+
from utils.logging import get_logger
logger = get_logger('intercept.audio_ws')
@@ -56,10 +59,8 @@ def kill_audio_processes():
audio_process.terminate()
audio_process.wait(timeout=0.5)
except:
- try:
+ with contextlib.suppress(BaseException):
audio_process.kill()
- except:
- pass
audio_process = None
if rtl_process:
@@ -67,10 +68,8 @@ def kill_audio_processes():
rtl_process.terminate()
rtl_process.wait(timeout=0.5)
except:
- try:
+ with contextlib.suppress(BaseException):
rtl_process.kill()
- except:
- pass
rtl_process = None
time.sleep(0.3)
@@ -261,16 +260,10 @@ def init_audio_websocket(app: Flask):
# Complete WebSocket close handshake, then shut down the
# raw socket so Werkzeug cannot write its HTTP 200 response
# on top of the WebSocket stream.
- try:
+ with contextlib.suppress(Exception):
ws.close()
- except Exception:
- pass
- try:
+ with contextlib.suppress(Exception):
ws.sock.shutdown(socket.SHUT_RDWR)
- except Exception:
- pass
- try:
+ with contextlib.suppress(Exception):
ws.sock.close()
- except Exception:
- pass
logger.info("WebSocket audio client disconnected")
diff --git a/routes/bluetooth.py b/routes/bluetooth.py
index 15de6b2..4564a8c 100644
--- a/routes/bluetooth.py
+++ b/routes/bluetooth.py
@@ -2,8 +2,7 @@
from __future__ import annotations
-import fcntl
-import json
+import contextlib
import os
import platform
import pty
@@ -13,30 +12,22 @@ import select
import subprocess
import threading
import time
-from typing import Any, Generator
+from typing import Any
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
-from utils.dependencies import check_tool
-from utils.logging import bluetooth_logger as logger
-from utils.sse import sse_stream_fanout
-from utils.event_pipeline import process_event
-from utils.validation import validate_bluetooth_interface
-from data.oui import OUI_DATABASE, load_oui_database, get_manufacturer
-from data.patterns import AIRTAG_PREFIXES, TILE_PREFIXES, SAMSUNG_TRACKER
+from data.oui import OUI_DATABASE, get_manufacturer, load_oui_database
+from data.patterns import AIRTAG_PREFIXES, SAMSUNG_TRACKER, TILE_PREFIXES
from utils.constants import (
- BT_TERMINATE_TIMEOUT,
- SSE_KEEPALIVE_INTERVAL,
- SSE_QUEUE_TIMEOUT,
SUBPROCESS_TIMEOUT_SHORT,
- SERVICE_ENUM_TIMEOUT,
- PROCESS_START_WAIT,
- BT_RESET_DELAY,
- BT_ADAPTER_DOWN_WAIT,
- PROCESS_TERMINATE_TIMEOUT,
)
+from utils.dependencies import check_tool
+from utils.event_pipeline import process_event
+from utils.logging import bluetooth_logger as logger
+from utils.responses import api_error, api_success
+from utils.sse import sse_stream_fanout
+from utils.validation import validate_bluetooth_interface
bluetooth_bp = Blueprint('bluetooth', __name__, url_prefix='/bt')
@@ -328,10 +319,8 @@ def stream_bt_scan(process, scan_mode):
except OSError:
break
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
except Exception as e:
app_module.bt_queue.put({'type': 'error', 'text': str(e)})
@@ -485,10 +474,8 @@ def reset_bt_adapter():
app_module.bt_process.terminate()
app_module.bt_process.wait(timeout=2)
except (subprocess.TimeoutExpired, OSError):
- try:
+ with contextlib.suppress(OSError):
app_module.bt_process.kill()
- except OSError:
- pass
app_module.bt_process = None
try:
@@ -507,7 +494,7 @@ def reset_bt_adapter():
return jsonify({
'status': 'success' if is_up else 'warning',
- 'message': f'Adapter {interface} reset' if is_up else f'Reset attempted but adapter may still be down',
+ 'message': f'Adapter {interface} reset' if is_up else 'Reset attempted but adapter may still be down',
'is_up': is_up
})
diff --git a/routes/bluetooth_v2.py b/routes/bluetooth_v2.py
index f152baa..5cb3bdc 100644
--- a/routes/bluetooth_v2.py
+++ b/routes/bluetooth_v2.py
@@ -7,31 +7,27 @@ aggregation, and heuristics.
from __future__ import annotations
+import contextlib
import csv
import io
import json
import logging
import threading
import time
+from collections.abc import Generator
from datetime import datetime
-from typing import Generator
-from flask import Blueprint, Response, jsonify, request, session
+from flask import Blueprint, Response, jsonify, request
from utils.bluetooth import (
- BluetoothScanner,
BTDeviceAggregate,
- get_bluetooth_scanner,
check_capabilities,
- RANGE_UNKNOWN,
- TrackerType,
- TrackerConfidence,
- get_tracker_engine,
+ get_bluetooth_scanner,
)
from utils.database import get_db
-from utils.responses import api_success, api_error
-from utils.sse import format_sse
from utils.event_pipeline import process_event
+from utils.responses import api_error
+from utils.sse import format_sse
logger = logging.getLogger('intercept.bluetooth_v2')
@@ -901,10 +897,8 @@ def stream_events():
"""Generate SSE events from scanner."""
for event in scanner.stream_events(timeout=1.0):
event_name, event_data = map_event_type(event)
- try:
+ with contextlib.suppress(Exception):
process_event('bluetooth', event_data, event_name)
- except Exception:
- pass
yield format_sse(event_data, event=event_name)
return Response(
@@ -972,7 +966,6 @@ def get_tscm_bluetooth_snapshot(duration: int = 8) -> list[dict]:
Returns:
List of device dictionaries in TSCM format.
"""
- import time
import logging
logger = logging.getLogger('intercept.bluetooth_v2')
diff --git a/routes/bt_locate.py b/routes/bt_locate.py
index b6e6dca..acc95b7 100644
--- a/routes/bt_locate.py
+++ b/routes/bt_locate.py
@@ -12,7 +12,6 @@ from collections.abc import Generator
from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
from utils.bluetooth.irk_extractor import get_paired_irks
from utils.bt_locate import (
Environment,
@@ -22,6 +21,7 @@ from utils.bt_locate import (
start_locate_session,
stop_locate_session,
)
+from utils.responses import api_error
from utils.sse import format_sse
logger = logging.getLogger('intercept.bt_locate')
diff --git a/routes/controller.py b/routes/controller.py
index 5f94075..8d0a9a0 100644
--- a/routes/controller.py
+++ b/routes/controller.py
@@ -10,30 +10,34 @@ This blueprint provides:
from __future__ import annotations
-import json
import logging
import queue
import threading
import time
+from collections.abc import Generator
from datetime import datetime, timezone
-from typing import Generator
import requests
+from flask import Blueprint, Response, jsonify, request
-from flask import Blueprint, jsonify, request, Response
-
-from utils.responses import api_success, api_error
+from utils.agent_client import AgentClient, AgentConnectionError, AgentHTTPError, create_client_from_agent
from utils.database import (
- create_agent, get_agent, get_agent_by_name, list_agents,
- update_agent, delete_agent, store_push_payload, get_recent_payloads
-)
-from utils.agent_client import (
- AgentClient, AgentHTTPError, AgentConnectionError, create_client_from_agent
+ create_agent,
+ delete_agent,
+ get_agent,
+ get_agent_by_name,
+ get_recent_payloads,
+ list_agents,
+ store_push_payload,
+ update_agent,
)
+from utils.responses import api_error
from utils.sse import format_sse
from utils.trilateration import (
- DeviceLocationTracker, PathLossModel, Trilateration,
- AgentObservation, estimate_location_from_observations
+ DeviceLocationTracker,
+ PathLossModel,
+ Trilateration,
+ estimate_location_from_observations,
)
logger = logging.getLogger('intercept.controller')
@@ -700,6 +704,7 @@ def stream_all_agents():
def agent_management_page():
"""Render the agent management page."""
from flask import render_template
+
from config import VERSION
return render_template('agents.html', version=VERSION)
diff --git a/routes/correlation.py b/routes/correlation.py
index 9097101..c4fd678 100644
--- a/routes/correlation.py
+++ b/routes/correlation.py
@@ -2,12 +2,12 @@
from __future__ import annotations
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, request
import app as app_module
from utils.correlation import get_correlations
-from utils.responses import api_success, api_error
from utils.logging import get_logger
+from utils.responses import api_error, api_success
logger = get_logger('intercept.correlation')
diff --git a/routes/dsc.py b/routes/dsc.py
index 4012674..cd458a6 100644
--- a/routes/dsc.py
+++ b/routes/dsc.py
@@ -6,7 +6,7 @@ distress and safety communications per ITU-R M.493.
from __future__ import annotations
-import json
+import contextlib
import logging
import os
import pty
@@ -16,37 +16,36 @@ import shutil
import subprocess
import threading
import time
-from datetime import datetime
-from typing import Any, Generator
+from typing import Any
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
from utils.constants import (
- DSC_VHF_FREQUENCY_MHZ,
DSC_SAMPLE_RATE,
DSC_TERMINATE_TIMEOUT,
+ DSC_VHF_FREQUENCY_MHZ,
)
from utils.database import (
- store_dsc_alert,
- get_dsc_alerts,
- get_dsc_alert,
acknowledge_dsc_alert,
+ get_dsc_alert,
get_dsc_alert_summary,
+ get_dsc_alerts,
+ store_dsc_alert,
)
+from utils.dependencies import get_tool_path
from utils.dsc.parser import parse_dsc_message
-from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
+from utils.process import register_process, unregister_process
+from utils.responses import api_error
+from utils.sdr import SDRFactory, SDRType
+from utils.sse import sse_stream_fanout
from utils.validation import (
validate_device_index,
validate_gain,
validate_rtl_tcp_host,
validate_rtl_tcp_port,
)
-from utils.sdr import SDRFactory, SDRType
-from utils.dependencies import get_tool_path
-from utils.process import register_process, unregister_process
logger = logging.getLogger('intercept.dsc')
@@ -83,8 +82,8 @@ def _check_dsc_tools() -> dict:
# Check for scipy/numpy (needed for decoder)
scipy_available = False
try:
- import scipy
import numpy
+ import scipy
scipy_available = True
except ImportError:
pass
@@ -179,10 +178,8 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non
})
finally:
global dsc_active_device, dsc_active_sdr_type
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
dsc_running = False
# Cleanup both processes
with app_module.dsc_lock:
@@ -193,10 +190,8 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non
proc.terminate()
proc.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
proc.kill()
- except Exception:
- pass
unregister_process(proc)
app_module.dsc_queue.put({'type': 'status', 'status': 'stopped'})
with app_module.dsc_lock:
@@ -466,10 +461,8 @@ def start_decoding() -> Response:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
rtl_process.kill()
- except Exception:
- pass
# Release device on failure
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr')
@@ -485,10 +478,8 @@ def start_decoding() -> Response:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
rtl_process.kill()
- except Exception:
- pass
# Release device on failure
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr')
@@ -518,10 +509,8 @@ def stop_decoding() -> Response:
app_module.dsc_rtl_process.terminate()
app_module.dsc_rtl_process.wait(timeout=DSC_TERMINATE_TIMEOUT)
except subprocess.TimeoutExpired:
- try:
+ with contextlib.suppress(OSError):
app_module.dsc_rtl_process.kill()
- except OSError:
- pass
except OSError:
pass
@@ -531,10 +520,8 @@ def stop_decoding() -> Response:
app_module.dsc_process.terminate()
app_module.dsc_process.wait(timeout=DSC_TERMINATE_TIMEOUT)
except subprocess.TimeoutExpired:
- try:
+ with contextlib.suppress(OSError):
app_module.dsc_process.kill()
- except OSError:
- pass
except OSError:
pass
diff --git a/routes/gps.py b/routes/gps.py
index 23b1ce7..7e777f0 100644
--- a/routes/gps.py
+++ b/routes/gps.py
@@ -3,12 +3,9 @@
from __future__ import annotations
import queue
-import time
-from collections.abc import Generator
from flask import Blueprint, Response, jsonify
-from utils.responses import api_success, api_error
from utils.gps import (
GPSPosition,
GPSSkyData,
diff --git a/routes/listening_post/__init__.py b/routes/listening_post/__init__.py
index b7926b6..b6e7e5a 100644
--- a/routes/listening_post/__init__.py
+++ b/routes/listening_post/__init__.py
@@ -11,8 +11,8 @@ from __future__ import annotations
import os
import queue
-import signal
import shutil
+import signal
import struct
import subprocess
import threading
@@ -22,15 +22,15 @@ from typing import Dict, List, Optional
from flask import Blueprint
-from utils.logging import get_logger
-from utils.sse import sse_stream_fanout
-from utils.event_pipeline import process_event
from utils.constants import (
- SSE_QUEUE_TIMEOUT,
- SSE_KEEPALIVE_INTERVAL,
PROCESS_TERMINATE_TIMEOUT,
+ SSE_KEEPALIVE_INTERVAL,
+ SSE_QUEUE_TIMEOUT,
)
+from utils.event_pipeline import process_event
+from utils.logging import get_logger
from utils.sdr import SDRFactory, SDRType
+from utils.sse import sse_stream_fanout
logger = get_logger('intercept.receiver')
@@ -39,6 +39,8 @@ receiver_bp = Blueprint('receiver', __name__, url_prefix='/receiver')
# Deferred import to avoid circular import at module load time.
# app.py -> register_blueprints -> from .listening_post import receiver_bp
# must find receiver_bp already defined (above) before this import runs.
+import contextlib
+
import app as app_module # noqa: E402
# ============================================
@@ -57,16 +59,16 @@ audio_source = 'process'
audio_start_token = 0
# Scanner state
-scanner_thread: Optional[threading.Thread] = None
+scanner_thread: threading.Thread | None = None
scanner_running = False
scanner_lock = threading.Lock()
scanner_paused = False
scanner_current_freq = 0.0
-scanner_active_device: Optional[int] = None
+scanner_active_device: int | None = None
scanner_active_sdr_type: str = 'rtlsdr'
-receiver_active_device: Optional[int] = None
+receiver_active_device: int | None = None
receiver_active_sdr_type: str = 'rtlsdr'
-scanner_power_process: Optional[subprocess.Popen] = None
+scanner_power_process: subprocess.Popen | None = None
scanner_config = {
'start_freq': 88.0,
'end_freq': 108.0,
@@ -84,7 +86,7 @@ scanner_config = {
}
# Activity log
-activity_log: List[Dict] = []
+activity_log: list[dict] = []
activity_log_lock = threading.Lock()
MAX_LOG_ENTRIES = 500
@@ -95,12 +97,12 @@ scanner_queue: queue.Queue = queue.Queue(maxsize=100)
scanner_skip_signal = False
# Waterfall / spectrogram state
-waterfall_process: Optional[subprocess.Popen] = None
-waterfall_thread: Optional[threading.Thread] = None
+waterfall_process: subprocess.Popen | None = None
+waterfall_thread: threading.Thread | None = None
waterfall_running = False
waterfall_lock = threading.Lock()
waterfall_queue: queue.Queue = queue.Queue(maxsize=200)
-waterfall_active_device: Optional[int] = None
+waterfall_active_device: int | None = None
waterfall_active_sdr_type: str = 'rtlsdr'
waterfall_config = {
'start_freq': 88.0,
@@ -185,13 +187,11 @@ def add_activity_log(event_type: str, frequency: float, details: str = ''):
activity_log.pop()
# Also push to SSE queue
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'log',
'entry': entry
})
- except queue.Full:
- pass
def _start_audio_stream(
@@ -348,12 +348,12 @@ def _start_audio_stream(
rtl_stderr = ''
ffmpeg_stderr = ''
try:
- with open(rtl_stderr_log, 'r') as f:
+ with open(rtl_stderr_log) as f:
rtl_stderr = f.read().strip()
except Exception:
pass
try:
- with open(ffmpeg_stderr_log, 'r') as f:
+ with open(ffmpeg_stderr_log) as f:
ffmpeg_stderr = f.read().strip()
except Exception:
pass
@@ -502,10 +502,8 @@ def _stop_waterfall_internal() -> None:
waterfall_process.terminate()
waterfall_process.wait(timeout=1)
except Exception:
- try:
+ with contextlib.suppress(Exception):
waterfall_process.kill()
- except Exception:
- pass
waterfall_process = None
if waterfall_active_device is not None:
@@ -517,7 +515,9 @@ def _stop_waterfall_internal() -> None:
# ============================================
# Import sub-modules to register routes on receiver_bp
# ============================================
-from . import scanner # noqa: E402, F401
-from . import audio # noqa: E402, F401
-from . import waterfall # noqa: E402, F401
-from . import tools # noqa: E402, F401
+from . import (
+ audio, # noqa: E402, F401
+ scanner, # noqa: E402, F401
+ tools, # noqa: E402, F401
+ waterfall, # noqa: E402, F401
+)
diff --git a/routes/listening_post/audio.py b/routes/listening_post/audio.py
index 467bc57..39133b0 100644
--- a/routes/listening_post/audio.py
+++ b/routes/listening_post/audio.py
@@ -2,27 +2,27 @@
from __future__ import annotations
+import contextlib
import os
import select
import subprocess
import time
-from typing import Any
-from flask import jsonify, request, Response
+from flask import Response, jsonify, request
+
+import routes.listening_post as _state
from . import (
- receiver_bp,
- logger,
- app_module,
- scanner_config,
- _wav_header,
_start_audio_stream,
_stop_audio_stream,
_stop_waterfall_internal,
+ _wav_header,
+ app_module,
+ logger,
normalize_modulation,
+ receiver_bp,
+ scanner_config,
)
-import routes.listening_post as _state
-
# ============================================
# MANUAL AUDIO ENDPOINTS (for direct listening)
@@ -106,23 +106,17 @@ def start_audio() -> Response:
# Scanner teardown outside lock (blocking: thread join, process wait, pkill, sleep)
if need_scanner_teardown:
if scanner_thread_ref and scanner_thread_ref.is_alive():
- try:
+ with contextlib.suppress(Exception):
scanner_thread_ref.join(timeout=2.0)
- except Exception:
- pass
if scanner_proc_ref and scanner_proc_ref.poll() is None:
try:
scanner_proc_ref.terminate()
scanner_proc_ref.wait(timeout=1)
except Exception:
- try:
+ with contextlib.suppress(Exception):
scanner_proc_ref.kill()
- except Exception:
- pass
- try:
+ with contextlib.suppress(Exception):
subprocess.run(['pkill', '-9', 'rtl_power'], capture_output=True, timeout=0.5)
- except Exception:
- pass
time.sleep(0.5)
# Re-acquire lock for waterfall check and device claim
@@ -232,7 +226,7 @@ def start_audio() -> Response:
start_error = ''
for log_path in ('/tmp/rtl_fm_stderr.log', '/tmp/ffmpeg_stderr.log'):
try:
- with open(log_path, 'r') as handle:
+ with open(log_path) as handle:
content = handle.read().strip()
if content:
start_error = content.splitlines()[-1]
@@ -290,7 +284,7 @@ def audio_debug() -> Response:
def _read_log(path: str) -> str:
try:
- with open(path, 'r') as handle:
+ with open(path) as handle:
return handle.read().strip()
except Exception:
return ''
diff --git a/routes/listening_post/scanner.py b/routes/listening_post/scanner.py
index 61b8b3c..01fa090 100644
--- a/routes/listening_post/scanner.py
+++ b/routes/listening_post/scanner.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import contextlib
import math
import queue
import struct
@@ -10,32 +11,32 @@ import threading
import time
from typing import Any
-from flask import jsonify, request, Response
+from flask import Response, jsonify, request
+
+import routes.listening_post as _state
from . import (
- receiver_bp,
- logger,
- app_module,
- scanner_queue,
- scanner_config,
- scanner_lock,
- activity_log,
- activity_log_lock,
- add_activity_log,
- find_rtl_fm,
- find_rtl_power,
- find_rx_fm,
- normalize_modulation,
+ SSE_KEEPALIVE_INTERVAL,
+ SSE_QUEUE_TIMEOUT,
_rtl_fm_demod_mode,
_start_audio_stream,
_stop_audio_stream,
+ activity_log,
+ activity_log_lock,
+ add_activity_log,
+ app_module,
+ find_rtl_fm,
+ find_rtl_power,
+ find_rx_fm,
+ logger,
+ normalize_modulation,
process_event,
+ receiver_bp,
+ scanner_config,
+ scanner_lock,
+ scanner_queue,
sse_stream_fanout,
- SSE_QUEUE_TIMEOUT,
- SSE_KEEPALIVE_INTERVAL,
)
-import routes.listening_post as _state
-
# ============================================
# SCANNER IMPLEMENTATION
@@ -76,7 +77,7 @@ def scanner_loop():
_state.scanner_current_freq = current_freq
# Notify clients of frequency change
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'freq_change',
'frequency': current_freq,
@@ -84,8 +85,6 @@ def scanner_loop():
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
# Start rtl_fm at this frequency
freq_hz = int(current_freq * 1e6)
@@ -168,7 +167,7 @@ def scanner_loop():
audio_detected = rms > effective_threshold
# Send level info to clients
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'scan_update',
'frequency': current_freq,
@@ -178,8 +177,6 @@ def scanner_loop():
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
if audio_detected and _state.scanner_running:
if not signal_detected:
@@ -214,13 +211,11 @@ def scanner_loop():
_state.scanner_skip_signal = False
signal_detected = False
_stop_audio_stream()
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'signal_skipped',
'frequency': current_freq
})
- except queue.Full:
- pass
# Move to next frequency (step is in kHz, convert to MHz)
current_freq += step_mhz
if current_freq > scanner_config['end_freq']:
@@ -240,15 +235,13 @@ def scanner_loop():
if _state.scanner_running and not _state.scanner_skip_signal:
signal_detected = False
_stop_audio_stream()
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'signal_lost',
'frequency': current_freq,
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
current_freq += step_mhz
if current_freq > scanner_config['end_freq']:
@@ -268,13 +261,11 @@ def scanner_loop():
# Stop audio
_stop_audio_stream()
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'signal_lost',
'frequency': current_freq
})
- except queue.Full:
- pass
# Move to next frequency (step is in kHz, convert to MHz)
current_freq += step_mhz
@@ -321,7 +312,7 @@ def scanner_loop_power():
step_khz = scanner_config['step']
gain = scanner_config['gain']
device = scanner_config['device']
- squelch = scanner_config['squelch']
+ scanner_config['squelch']
mod = scanner_config['modulation']
# Configure sweep
@@ -355,7 +346,7 @@ def scanner_loop_power():
if not stdout:
add_activity_log('error', start_mhz, 'Power sweep produced no data')
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'scan_update',
'frequency': end_mhz,
@@ -365,8 +356,6 @@ def scanner_loop_power():
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
time.sleep(0.2)
continue
@@ -414,7 +403,7 @@ def scanner_loop_power():
if not segments:
add_activity_log('error', start_mhz, 'Power sweep bins missing')
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'scan_update',
'frequency': end_mhz,
@@ -424,8 +413,6 @@ def scanner_loop_power():
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
time.sleep(0.2)
continue
@@ -457,7 +444,7 @@ def scanner_loop_power():
level = int(max(0, snr) * 100)
threshold = int(snr_threshold * 100)
progress = min(1.0, (segment_offset + idx) / max(1, total_bins - 1))
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'scan_update',
'frequency': _state.scanner_current_freq,
@@ -468,8 +455,6 @@ def scanner_loop_power():
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
segment_offset += len(bin_values)
# Detect peaks (clusters above threshold)
@@ -505,7 +490,7 @@ def scanner_loop_power():
threshold = int(snr_threshold * 100)
add_activity_log('signal_found', freq_mhz,
f'Peak detected at {freq_mhz:.3f} MHz ({mod.upper()})')
- try:
+ with contextlib.suppress(queue.Full):
scanner_queue.put_nowait({
'type': 'signal_found',
'frequency': freq_mhz,
@@ -517,8 +502,6 @@ def scanner_loop_power():
'range_start': scanner_config['start_freq'],
'range_end': scanner_config['end_freq']
})
- except queue.Full:
- pass
add_activity_log('scan_cycle', start_mhz, 'Power sweep complete')
time.sleep(max(0.1, scanner_config.get('scan_delay', 0.5)))
@@ -590,9 +573,8 @@ def start_scanner() -> Response:
sdr_type = scanner_config['sdr_type']
# Power scan only supports RTL-SDR for now
- if scanner_config['scan_method'] == 'power':
- if sdr_type != 'rtlsdr' or not find_rtl_power():
- scanner_config['scan_method'] = 'classic'
+ if scanner_config['scan_method'] == 'power' and (sdr_type != 'rtlsdr' or not find_rtl_power()):
+ scanner_config['scan_method'] = 'classic'
# Check tools based on chosen method
if scanner_config['scan_method'] == 'power':
@@ -666,10 +648,8 @@ def stop_scanner() -> Response:
_state.scanner_power_process.terminate()
_state.scanner_power_process.wait(timeout=1)
except Exception:
- try:
+ with contextlib.suppress(Exception):
_state.scanner_power_process.kill()
- except Exception:
- pass
_state.scanner_power_process = None
if _state.scanner_active_device is not None:
app_module.release_sdr_device(_state.scanner_active_device, _state.scanner_active_sdr_type)
diff --git a/routes/listening_post/tools.py b/routes/listening_post/tools.py
index f426ced..0e9dc42 100644
--- a/routes/listening_post/tools.py
+++ b/routes/listening_post/tools.py
@@ -2,18 +2,17 @@
from __future__ import annotations
-from flask import jsonify, request, Response
+from flask import Response, jsonify, request
from . import (
- receiver_bp,
- logger,
+ find_ffmpeg,
find_rtl_fm,
find_rtl_power,
find_rx_fm,
- find_ffmpeg,
+ logger,
+ receiver_bp,
)
-
# ============================================
# TOOL CHECK ENDPOINT
# ============================================
diff --git a/routes/listening_post/waterfall.py b/routes/listening_post/waterfall.py
index c6ec3c5..20daa55 100644
--- a/routes/listening_post/waterfall.py
+++ b/routes/listening_post/waterfall.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import contextlib
import math
import queue
import struct
@@ -11,23 +12,23 @@ import time
from datetime import datetime
from typing import Any
-from flask import jsonify, request, Response
+from flask import Response, jsonify, request
-from . import (
- receiver_bp,
- logger,
- app_module,
- _stop_waterfall_internal,
- process_event,
- sse_stream_fanout,
- SSE_QUEUE_TIMEOUT,
- SSE_KEEPALIVE_INTERVAL,
- find_rtl_power,
- SDRFactory,
- SDRType,
-)
import routes.listening_post as _state
+from . import (
+ SSE_KEEPALIVE_INTERVAL,
+ SSE_QUEUE_TIMEOUT,
+ SDRFactory,
+ SDRType,
+ _stop_waterfall_internal,
+ app_module,
+ find_rtl_power,
+ logger,
+ process_event,
+ receiver_bp,
+ sse_stream_fanout,
+)
# ============================================
# WATERFALL HELPER FUNCTIONS
@@ -75,14 +76,12 @@ def _parse_rtl_power_line(line: str) -> tuple[str | None, float | None, float |
def _queue_waterfall_error(message: str) -> None:
"""Push an error message onto the waterfall SSE queue."""
- try:
+ with contextlib.suppress(queue.Full):
_state.waterfall_queue.put_nowait({
'type': 'waterfall_error',
'message': message,
'timestamp': datetime.now().isoformat(),
})
- except queue.Full:
- pass
def _downsample_bins(values: list[float], target: int) -> list[float]:
@@ -229,14 +228,10 @@ def _waterfall_loop_iq(sdr_type: SDRType):
try:
_state.waterfall_queue.put_nowait(msg)
except queue.Full:
- try:
+ with contextlib.suppress(queue.Empty):
_state.waterfall_queue.get_nowait()
- except queue.Empty:
- pass
- try:
+ with contextlib.suppress(queue.Full):
_state.waterfall_queue.put_nowait(msg)
- except queue.Full:
- pass
# Throttle to respect interval
time.sleep(interval)
@@ -254,10 +249,8 @@ def _waterfall_loop_iq(sdr_type: SDRType):
_state.waterfall_process.terminate()
_state.waterfall_process.wait(timeout=1)
except Exception:
- try:
+ with contextlib.suppress(Exception):
_state.waterfall_process.kill()
- except Exception:
- pass
_state.waterfall_process = None
logger.info("Waterfall IQ loop stopped")
@@ -346,14 +339,10 @@ def _waterfall_loop_rtl_power():
try:
_state.waterfall_queue.put_nowait(msg)
except queue.Full:
- try:
+ with contextlib.suppress(queue.Empty):
_state.waterfall_queue.get_nowait()
- except queue.Empty:
- pass
- try:
+ with contextlib.suppress(queue.Full):
_state.waterfall_queue.put_nowait(msg)
- except queue.Full:
- pass
all_bins = []
sweep_start_hz = start_hz
@@ -379,10 +368,8 @@ def _waterfall_loop_rtl_power():
'bins': bins_to_send,
'timestamp': datetime.now().isoformat(),
}
- try:
+ with contextlib.suppress(queue.Full):
_state.waterfall_queue.put_nowait(msg)
- except queue.Full:
- pass
if _state.waterfall_running and not received_any:
_queue_waterfall_error('No waterfall FFT data received from rtl_power')
@@ -397,10 +384,8 @@ def _waterfall_loop_rtl_power():
_state.waterfall_process.terminate()
_state.waterfall_process.wait(timeout=1)
except Exception:
- try:
+ with contextlib.suppress(Exception):
_state.waterfall_process.kill()
- except Exception:
- pass
_state.waterfall_process = None
logger.info("Waterfall loop stopped")
@@ -432,9 +417,8 @@ def start_waterfall() -> Response:
sdr_type_str = sdr_type.value
# RTL-SDR uses rtl_power; other types use rx_sdr via IQ capture
- if sdr_type == SDRType.RTL_SDR:
- if not find_rtl_power():
- return jsonify({'status': 'error', 'message': 'rtl_power not found'}), 503
+ if sdr_type == SDRType.RTL_SDR and not find_rtl_power():
+ return jsonify({'status': 'error', 'message': 'rtl_power not found'}), 503
try:
_state.waterfall_config['start_freq'] = float(data.get('start_freq', 88.0))
diff --git a/routes/meshtastic.py b/routes/meshtastic.py
index 79477ab..5c4f9f1 100644
--- a/routes/meshtastic.py
+++ b/routes/meshtastic.py
@@ -11,21 +11,19 @@ Supports multiple connection types:
from __future__ import annotations
import queue
-import time
-from typing import Generator
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
from utils.logging import get_logger
-from utils.sse import sse_stream_fanout
from utils.meshtastic import (
+ MeshtasticMessage,
get_meshtastic_client,
+ is_meshtastic_available,
start_meshtastic,
stop_meshtastic,
- is_meshtastic_available,
- MeshtasticMessage,
)
+from utils.responses import api_error
+from utils.sse import sse_stream_fanout
logger = get_logger('intercept.meshtastic')
diff --git a/routes/meteor_websocket.py b/routes/meteor_websocket.py
index c13ccb7..3a0c560 100644
--- a/routes/meteor_websocket.py
+++ b/routes/meteor_websocket.py
@@ -20,7 +20,7 @@ from typing import Any
from flask import Blueprint, Flask, Response, jsonify, request
-from utils.responses import api_success, api_error
+from utils.responses import api_error
try:
from flask_sock import Sock
diff --git a/routes/morse.py b/routes/morse.py
index a84b296..c5b3a4f 100644
--- a/routes/morse.py
+++ b/routes/morse.py
@@ -13,7 +13,6 @@ from typing import Any
from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
from utils.event_pipeline import process_event
from utils.logging import sensor_logger as logger
@@ -22,6 +21,7 @@ from utils.morse import (
morse_decoder_thread,
)
from utils.process import register_process, safe_terminate, unregister_process
+from utils.responses import api_error
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.validation import (
diff --git a/routes/offline.py b/routes/offline.py
index 6e8c4c0..63ca3f1 100644
--- a/routes/offline.py
+++ b/routes/offline.py
@@ -2,11 +2,13 @@
Offline mode routes - Asset management and settings for offline operation.
"""
-from flask import Blueprint, jsonify, request
-from utils.database import get_setting, set_setting
-from utils.responses import api_success, api_error
import os
+from flask import Blueprint, request
+
+from utils.database import get_setting, set_setting
+from utils.responses import api_error, api_success
+
offline_bp = Blueprint('offline', __name__, url_prefix='/offline')
# Default offline settings
diff --git a/routes/ook.py b/routes/ook.py
index 79ca5a1..2d05ea3 100644
--- a/routes/ook.py
+++ b/routes/ook.py
@@ -19,10 +19,10 @@ from flask import Blueprint, Response, jsonify, request
import app as app_module
from utils.event_pipeline import process_event
-from utils.responses import api_success, api_error
from utils.logging import sensor_logger as logger
from utils.ook import ook_parser_thread
from utils.process import register_process, safe_terminate, unregister_process
+from utils.responses import api_error
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.validation import (
diff --git a/routes/pager.py b/routes/pager.py
index 991a951..1c4ee26 100644
--- a/routes/pager.py
+++ b/routes/pager.py
@@ -2,34 +2,39 @@
from __future__ import annotations
+import contextlib
import math
import os
import pathlib
-import re
import pty
import queue
+import re
import select
import struct
import subprocess
import threading
import time
from datetime import datetime
-from typing import Any, Generator
+from typing import Any
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
-from utils.logging import pager_logger as logger
-from utils.validation import (
- validate_frequency, validate_device_index, validate_gain, validate_ppm,
- validate_rtl_tcp_host, validate_rtl_tcp_port
-)
-from utils.sse import sse_stream_fanout
-from utils.event_pipeline import process_event
-from utils.process import safe_terminate, register_process, unregister_process
-from utils.sdr import SDRFactory, SDRType, SDRValidationError
from utils.dependencies import get_tool_path
+from utils.event_pipeline import process_event
+from utils.logging import pager_logger as logger
+from utils.process import register_process, unregister_process
+from utils.responses import api_error
+from utils.sdr import SDRFactory, SDRType
+from utils.sse import sse_stream_fanout
+from utils.validation import (
+ validate_device_index,
+ validate_frequency,
+ validate_gain,
+ validate_ppm,
+ validate_rtl_tcp_host,
+ validate_rtl_tcp_port,
+)
pager_bp = Blueprint('pager', __name__)
@@ -189,10 +194,8 @@ def audio_relay_thread(
except Exception as e:
logger.debug(f"Audio relay error: {e}")
finally:
- try:
+ with contextlib.suppress(OSError):
multimon_stdin.close()
- except OSError:
- pass
def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
@@ -237,10 +240,8 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
app_module.output_queue.put({'type': 'error', 'text': str(e)})
finally:
global pager_active_device, pager_active_sdr_type
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
# Signal relay thread to stop
with app_module.process_lock:
stop_relay = getattr(app_module.current_process, '_stop_relay', None)
@@ -255,10 +256,8 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
proc.terminate()
proc.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
proc.kill()
- except Exception:
- pass
unregister_process(proc)
app_module.output_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.process_lock:
@@ -454,10 +453,8 @@ def start_decoding() -> Response:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
rtl_process.kill()
- except Exception:
- pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
@@ -470,10 +467,8 @@ def start_decoding() -> Response:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
rtl_process.kill()
- except Exception:
- pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr')
@@ -498,17 +493,13 @@ def stop_decoding() -> Response:
app_module.current_process._rtl_process.terminate()
app_module.current_process._rtl_process.wait(timeout=2)
except (subprocess.TimeoutExpired, OSError):
- try:
+ with contextlib.suppress(OSError):
app_module.current_process._rtl_process.kill()
- except OSError:
- pass
# Close PTY master fd
if hasattr(app_module.current_process, '_master_fd'):
- try:
+ with contextlib.suppress(OSError):
os.close(app_module.current_process._master_fd)
- except OSError:
- pass
# Kill multimon-ng
app_module.current_process.terminate()
diff --git a/routes/radiosonde.py b/routes/radiosonde.py
index d09808c..b11e840 100644
--- a/routes/radiosonde.py
+++ b/routes/radiosonde.py
@@ -7,6 +7,7 @@ telemetry (position, altitude, temperature, humidity, pressure) on the
from __future__ import annotations
+import contextlib
import json
import os
import queue
@@ -20,7 +21,6 @@ from typing import Any
from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
from utils.constants import (
MAX_RADIOSONDE_AGE_SECONDS,
@@ -32,6 +32,7 @@ from utils.constants import (
)
from utils.gps import is_gpsd_running
from utils.logging import get_logger
+from utils.responses import api_error, api_success
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.validation import (
@@ -270,7 +271,7 @@ def _fix_data_ownership(path: str) -> None:
return
try:
uid_int, gid_int = int(uid), int(gid)
- for dirpath, dirnames, filenames in os.walk(path):
+ for dirpath, _dirnames, filenames in os.walk(path):
os.chown(dirpath, uid_int, gid_int)
for fname in filenames:
os.chown(os.path.join(dirpath, fname), uid_int, gid_int)
@@ -315,18 +316,14 @@ def parse_radiosonde_udp(udp_port: int) -> None:
if serial:
with _balloons_lock:
radiosonde_balloons[serial] = balloon
- try:
+ with contextlib.suppress(queue.Full):
app_module.radiosonde_queue.put_nowait({
'type': 'balloon',
**balloon,
})
- except queue.Full:
- pass
- try:
+ with contextlib.suppress(OSError):
sock.close()
- except OSError:
- pass
_udp_socket = None
logger.info("Radiosonde UDP listener stopped")
@@ -354,71 +351,51 @@ def _process_telemetry(msg: dict) -> dict | None:
# Position
for key in ('lat', 'latitude'):
if key in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['lat'] = float(msg[key])
- except (ValueError, TypeError):
- pass
break
for key in ('lon', 'longitude'):
if key in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['lon'] = float(msg[key])
- except (ValueError, TypeError):
- pass
break
# Altitude (metres)
if 'alt' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['alt'] = float(msg['alt'])
- except (ValueError, TypeError):
- pass
# Meteorological data
for field in ('temp', 'humidity', 'pressure'):
if field in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon[field] = float(msg[field])
- except (ValueError, TypeError):
- pass
# Velocity
if 'vel_h' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['vel_h'] = float(msg['vel_h'])
- except (ValueError, TypeError):
- pass
if 'vel_v' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['vel_v'] = float(msg['vel_v'])
- except (ValueError, TypeError):
- pass
if 'heading' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['heading'] = float(msg['heading'])
- except (ValueError, TypeError):
- pass
# GPS satellites
if 'sats' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['sats'] = int(msg['sats'])
- except (ValueError, TypeError):
- pass
# Battery voltage
if 'batt' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['batt'] = float(msg['batt'])
- except (ValueError, TypeError):
- pass
# Frequency
if 'freq' in msg:
- try:
+ with contextlib.suppress(ValueError, TypeError):
balloon['freq'] = float(msg['freq'])
- except (ValueError, TypeError):
- pass
balloon['last_seen'] = time.time()
return balloon
@@ -612,12 +589,10 @@ def start_radiosonde():
app_module.release_sdr_device(device_int, sdr_type_str)
stderr_output = ''
if app_module.radiosonde_process.stderr:
- try:
+ with contextlib.suppress(Exception):
stderr_output = app_module.radiosonde_process.stderr.read().decode(
'utf-8', errors='ignore'
).strip()
- except Exception:
- pass
if stderr_output:
logger.error(f"radiosonde_auto_rx stderr:\n{stderr_output}")
if stderr_output and (
@@ -686,10 +661,8 @@ def stop_radiosonde():
# Close UDP socket to unblock listener thread
if _udp_socket:
- try:
+ with contextlib.suppress(OSError):
_udp_socket.close()
- except OSError:
- pass
_udp_socket = None
# Release SDR device
diff --git a/routes/recordings.py b/routes/recordings.py
index b74446e..02b620b 100644
--- a/routes/recordings.py
+++ b/routes/recordings.py
@@ -5,10 +5,10 @@ from __future__ import annotations
import json
from pathlib import Path
-from flask import Blueprint, jsonify, request, send_file
+from flask import Blueprint, request, send_file
-from utils.recording import get_recording_manager, RECORDING_ROOT
-from utils.responses import api_success, api_error
+from utils.recording import RECORDING_ROOT, get_recording_manager
+from utils.responses import api_error, api_success
recordings_bp = Blueprint('recordings', __name__, url_prefix='/recordings')
diff --git a/routes/rtlamr.py b/routes/rtlamr.py
index a930b12..05a9fda 100644
--- a/routes/rtlamr.py
+++ b/routes/rtlamr.py
@@ -2,25 +2,23 @@
from __future__ import annotations
+import contextlib
import json
import queue
import subprocess
import threading
import time
from datetime import datetime
-from typing import Generator
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
-from utils.logging import sensor_logger as logger
-from utils.validation import (
- validate_frequency, validate_device_index, validate_gain, validate_ppm
-)
-from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
-from utils.process import safe_terminate, register_process, unregister_process
+from utils.logging import sensor_logger as logger
+from utils.process import register_process, unregister_process
+from utils.responses import api_error
+from utils.sse import sse_stream_fanout
+from utils.validation import validate_device_index, validate_frequency, validate_gain, validate_ppm
rtlamr_bp = Blueprint('rtlamr', __name__)
@@ -70,10 +68,8 @@ def stream_rtlamr_output(process: subprocess.Popen[bytes]) -> None:
process.terminate()
process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
process.kill()
- except Exception:
- pass
unregister_process(process)
# Kill companion rtl_tcp process
with rtl_tcp_lock:
@@ -82,10 +78,8 @@ def stream_rtlamr_output(process: subprocess.Popen[bytes]) -> None:
rtl_tcp_process.terminate()
rtl_tcp_process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
rtl_tcp_process.kill()
- except Exception:
- pass
unregister_process(rtl_tcp_process)
rtl_tcp_process = None
app_module.rtlamr_queue.put({'type': 'status', 'text': 'stopped'})
@@ -139,7 +133,7 @@ def start_rtlamr() -> Response:
# Get message type (default to scm)
msgtype = data.get('msgtype', 'scm')
output_format = data.get('format', 'json')
-
+
# Start rtl_tcp first
rtl_tcp_just_started = False
rtl_tcp_cmd_str = ''
@@ -191,16 +185,16 @@ def start_rtlamr() -> Response:
f'-format={output_format}',
f'-centerfreq={int(float(freq) * 1e6)}'
]
-
+
# Add filter options if provided
filterid = data.get('filterid')
if filterid:
cmd.append(f'-filterid={filterid}')
-
+
filtertype = data.get('filtertype')
if filtertype:
cmd.append(f'-filtertype={filtertype}')
-
+
# Unique messages only
if data.get('unique', True):
cmd.append('-unique=true')
diff --git a/routes/satellite.py b/routes/satellite.py
index dddd19f..a39f805 100644
--- a/routes/satellite.py
+++ b/routes/satellite.py
@@ -2,30 +2,25 @@
from __future__ import annotations
-import json
import math
import urllib.request
from datetime import datetime, timedelta
-from typing import Any, Optional
-from urllib.parse import urlparse
import requests
+from flask import Blueprint, jsonify, render_template, request
-from flask import Blueprint, jsonify, request, render_template, Response
-
-from utils.responses import api_success, api_error
from config import SHARED_OBSERVER_LOCATION_ENABLED
-
from data.satellites import TLE_SATELLITES
from utils.database import (
- get_tracked_satellites,
add_tracked_satellite,
bulk_add_tracked_satellites,
- update_tracked_satellite,
+ get_tracked_satellites,
remove_tracked_satellite,
+ update_tracked_satellite,
)
from utils.logging import satellite_logger as logger
-from utils.validation import validate_latitude, validate_longitude, validate_hours, validate_elevation
+from utils.responses import api_error
+from utils.validation import validate_elevation, validate_hours, validate_latitude, validate_longitude
satellite_bp = Blueprint('satellite', __name__, url_prefix='/satellite')
@@ -87,7 +82,7 @@ def init_tle_auto_refresh():
logger.info("TLE auto-refresh scheduled")
-def _fetch_iss_realtime(observer_lat: Optional[float] = None, observer_lon: Optional[float] = None) -> Optional[dict]:
+def _fetch_iss_realtime(observer_lat: float | None = None, observer_lon: float | None = None) -> dict | None:
"""
Fetch real-time ISS position from external APIs.
@@ -190,8 +185,8 @@ def satellite_dashboard():
def predict_passes():
"""Calculate satellite passes using skyfield."""
try:
- from skyfield.api import wgs84, EarthSatellite
from skyfield.almanac import find_discrete
+ from skyfield.api import EarthSatellite, wgs84
except ImportError:
return jsonify({
'status': 'error',
@@ -344,7 +339,7 @@ def predict_passes():
def get_satellite_position():
"""Get real-time positions of satellites."""
try:
- from skyfield.api import wgs84, EarthSatellite
+ from skyfield.api import EarthSatellite, wgs84
except ImportError:
return api_error('skyfield not installed', 503)
diff --git a/routes/sensor.py b/routes/sensor.py
index c6cd707..eaa5432 100644
--- a/routes/sensor.py
+++ b/routes/sensor.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import contextlib
import json
import math
import queue
@@ -9,21 +10,25 @@ import subprocess
import threading
import time
from datetime import datetime
-from typing import Any, Generator
+from typing import Any
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
-from utils.logging import sensor_logger as logger
-from utils.validation import (
- validate_frequency, validate_device_index, validate_gain, validate_ppm,
- validate_rtl_tcp_host, validate_rtl_tcp_port
-)
-from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
-from utils.process import safe_terminate, register_process, unregister_process
+from utils.logging import sensor_logger as logger
+from utils.process import register_process, unregister_process
+from utils.responses import api_error, api_success
from utils.sdr import SDRFactory, SDRType
+from utils.sse import sse_stream_fanout
+from utils.validation import (
+ validate_device_index,
+ validate_frequency,
+ validate_gain,
+ validate_ppm,
+ validate_rtl_tcp_host,
+ validate_rtl_tcp_port,
+)
sensor_bp = Blueprint('sensor', __name__)
@@ -137,10 +142,8 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
process.terminate()
process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
process.kill()
- except Exception:
- pass
unregister_process(process)
app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.sensor_lock:
diff --git a/routes/settings.py b/routes/settings.py
index 0b51f83..8a094b0 100644
--- a/routes/settings.py
+++ b/routes/settings.py
@@ -6,14 +6,14 @@ import os
import subprocess
import sys
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
from utils.database import (
- get_setting,
- set_setting,
delete_setting,
get_all_settings,
get_correlations,
+ get_setting,
+ set_setting,
)
from utils.logging import get_logger
from utils.responses import api_error, api_success
@@ -163,7 +163,7 @@ def check_dvb_driver_status() -> Response:
blacklist_contents = []
if blacklist_exists:
try:
- with open(BLACKLIST_FILE, 'r') as f:
+ with open(BLACKLIST_FILE) as f:
blacklist_contents = [line.strip() for line in f if line.strip() and not line.startswith('#')]
except Exception:
pass
diff --git a/routes/signalid.py b/routes/signalid.py
index 205a32e..af1b7c3 100644
--- a/routes/signalid.py
+++ b/routes/signalid.py
@@ -10,8 +10,8 @@ from typing import Any
from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
from utils.logging import get_logger
+from utils.responses import api_error
logger = get_logger('intercept.signalid')
diff --git a/routes/space_weather.py b/routes/space_weather.py
index 3683076..eae3bbd 100644
--- a/routes/space_weather.py
+++ b/routes/space_weather.py
@@ -13,7 +13,7 @@ from typing import Any
from flask import Blueprint, Response, jsonify
from utils.logging import get_logger
-from utils.responses import api_success, api_error
+from utils.responses import api_error
logger = get_logger('intercept.space_weather')
diff --git a/routes/spy_stations.py b/routes/spy_stations.py
index d0446ee..e06d13f 100644
--- a/routes/spy_stations.py
+++ b/routes/spy_stations.py
@@ -611,9 +611,9 @@ def get_station(station_id):
@spy_stations_bp.route('/filters')
def get_filters():
"""Return available filter options."""
- types = list(set(s['type'] for s in STATIONS))
- countries = sorted(list(set((s['country'], s['country_code']) for s in STATIONS)))
- modes = sorted(list(set(s['mode'].split('/')[0] for s in STATIONS)))
+ types = list({s['type'] for s in STATIONS})
+ countries = sorted({(s['country'], s['country_code']) for s in STATIONS})
+ modes = sorted({s['mode'].split('/')[0] for s in STATIONS})
return jsonify({
'status': 'success',
diff --git a/routes/sstv.py b/routes/sstv.py
index 3db0243..31f487a 100644
--- a/routes/sstv.py
+++ b/routes/sstv.py
@@ -6,23 +6,24 @@ ISS SSTV events occur during special commemorations and typically transmit on 14
from __future__ import annotations
+import contextlib
import queue
import threading
import time
from pathlib import Path
from typing import Any
-from flask import Blueprint, jsonify, request, Response, send_file
+from flask import Blueprint, Response, jsonify, request, send_file
-from utils.responses import api_success, api_error
import app as app_module
-from utils.logging import get_logger
-from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
+from utils.logging import get_logger
+from utils.responses import api_error
+from utils.sse import sse_stream_fanout
from utils.sstv import (
+ ISS_SSTV_FREQ,
get_sstv_decoder,
is_sstv_available,
- ISS_SSTV_FREQ,
)
logger = get_logger('intercept.sstv')
@@ -520,9 +521,11 @@ def iss_schedule():
return jsonify(_iss_schedule_cache)
try:
- from skyfield.api import wgs84, EarthSatellite
- from skyfield.almanac import find_discrete
from datetime import timedelta
+
+ from skyfield.almanac import find_discrete
+ from skyfield.api import EarthSatellite, wgs84
+
from data.satellites import TLE_SATELLITES
# Get ISS TLE
@@ -816,7 +819,5 @@ def decode_file():
finally:
# Clean up temp file
- try:
+ with contextlib.suppress(Exception):
Path(tmp_path).unlink()
- except Exception:
- pass
diff --git a/routes/sstv_general.py b/routes/sstv_general.py
index 89902f7..a6c4518 100644
--- a/routes/sstv_general.py
+++ b/routes/sstv_general.py
@@ -6,18 +6,17 @@ frequencies used by amateur radio operators worldwide.
from __future__ import annotations
+import contextlib
import queue
-import time
-from collections.abc import Generator
from pathlib import Path
from flask import Blueprint, Response, jsonify, request, send_file
-from utils.responses import api_success, api_error
import app as app_module
-from utils.logging import get_logger
-from utils.sse import sse_stream_fanout
from utils.event_pipeline import process_event
+from utils.logging import get_logger
+from utils.responses import api_error
+from utils.sse import sse_stream_fanout
from utils.sstv import (
get_general_sstv_decoder,
)
@@ -325,7 +324,5 @@ def decode_file():
return api_error(str(e), 500)
finally:
- try:
+ with contextlib.suppress(Exception):
Path(tmp_path).unlink()
- except Exception:
- pass
diff --git a/routes/subghz.py b/routes/subghz.py
index 6f0c02c..146afc6 100644
--- a/routes/subghz.py
+++ b/routes/subghz.py
@@ -6,25 +6,26 @@ signal replay/transmit, and wideband spectrum analysis.
from __future__ import annotations
+import contextlib
import queue
-from flask import Blueprint, jsonify, request, Response, send_file
+from flask import Blueprint, Response, jsonify, request, send_file
-from utils.responses import api_success, api_error
+from utils.constants import (
+ SUBGHZ_FREQ_MAX_MHZ,
+ SUBGHZ_FREQ_MIN_MHZ,
+ SUBGHZ_LNA_GAIN_MAX,
+ SUBGHZ_PRESETS,
+ SUBGHZ_SAMPLE_RATES,
+ SUBGHZ_TX_MAX_DURATION,
+ SUBGHZ_TX_VGA_GAIN_MAX,
+ SUBGHZ_VGA_GAIN_MAX,
+)
+from utils.event_pipeline import process_event
from utils.logging import get_logger
+from utils.responses import api_error
from utils.sse import sse_stream
from utils.subghz import get_subghz_manager
-from utils.event_pipeline import process_event
-from utils.constants import (
- SUBGHZ_FREQ_MIN_MHZ,
- SUBGHZ_FREQ_MAX_MHZ,
- SUBGHZ_LNA_GAIN_MAX,
- SUBGHZ_VGA_GAIN_MAX,
- SUBGHZ_TX_VGA_GAIN_MAX,
- SUBGHZ_TX_MAX_DURATION,
- SUBGHZ_SAMPLE_RATES,
- SUBGHZ_PRESETS,
-)
logger = get_logger('intercept.subghz')
@@ -36,10 +37,8 @@ _subghz_queue: queue.Queue = queue.Queue(maxsize=200)
def _event_callback(event: dict) -> None:
"""Forward SubGhzManager events to the SSE queue."""
- try:
+ with contextlib.suppress(Exception):
process_event('subghz', event, event.get('type'))
- except Exception:
- pass
try:
_subghz_queue.put_nowait(event)
except queue.Full:
diff --git a/routes/system.py b/routes/system.py
index e40d1da..ae0c365 100644
--- a/routes/system.py
+++ b/routes/system.py
@@ -22,7 +22,7 @@ from flask import Blueprint, Response, jsonify, request
from utils.constants import SSE_KEEPALIVE_INTERVAL, SSE_QUEUE_TIMEOUT
from utils.logging import sensor_logger as logger
-from utils.responses import api_success, api_error
+from utils.responses import api_error
from utils.sse import sse_stream_fanout
try:
diff --git a/routes/tscm/__init__.py b/routes/tscm/__init__.py
index 3e87cd8..0ca12fa 100644
--- a/routes/tscm/__init__.py
+++ b/routes/tscm/__init__.py
@@ -7,6 +7,7 @@ threat detection, and reporting.
from __future__ import annotations
+import contextlib
import json
import logging
import queue
@@ -23,9 +24,9 @@ from data.tscm_frequencies import (
get_sweep_preset,
)
from utils.database import (
+ acknowledge_tscm_threat,
add_device_timeline_entry,
add_tscm_threat,
- acknowledge_tscm_threat,
cleanup_old_timeline_entries,
create_tscm_schedule,
create_tscm_sweep,
@@ -43,6 +44,8 @@ from utils.database import (
update_tscm_schedule,
update_tscm_sweep,
)
+from utils.event_pipeline import process_event
+from utils.sse import sse_stream_fanout
from utils.tscm.baseline import (
BaselineComparator,
BaselineRecorder,
@@ -56,12 +59,10 @@ from utils.tscm.correlation import (
from utils.tscm.detector import ThreatDetector
from utils.tscm.device_identity import (
get_identity_engine,
- reset_identity_engine,
ingest_ble_dict,
ingest_wifi_dict,
+ reset_identity_engine,
)
-from utils.event_pipeline import process_event
-from utils.sse import sse_stream_fanout
# Import unified Bluetooth scanner helper for TSCM integration
try:
@@ -659,8 +660,8 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
Uses the BLE scanner module (bleak library) for proper manufacturer ID
detection, with fallback to system tools if bleak is unavailable.
"""
- import platform
import os
+ import platform
import re
import shutil
import subprocess
@@ -874,10 +875,8 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
except subprocess.TimeoutExpired:
process.kill()
- try:
+ with contextlib.suppress(OSError):
os.close(master_fd)
- except OSError:
- pass
logger.info(f"bluetoothctl scan found {len(devices)} devices")
@@ -914,7 +913,8 @@ def _scan_rf_signals(
"""
# Default stop check uses module-level _sweep_running
if stop_check is None:
- stop_check = lambda: not _sweep_running
+ def stop_check():
+ return not _sweep_running
import os
import shutil
import subprocess
@@ -954,11 +954,11 @@ def _scan_rf_signals(
# Tool exists but no device detected — try anyway (detection may have failed)
sdr_type = 'rtlsdr'
sweep_tool_path = rtl_power_path
- logger.info(f"No SDR detected but rtl_power found, attempting RTL-SDR scan")
+ logger.info("No SDR detected but rtl_power found, attempting RTL-SDR scan")
elif hackrf_sweep_path:
sdr_type = 'hackrf'
sweep_tool_path = hackrf_sweep_path
- logger.info(f"No SDR detected but hackrf_sweep found, attempting HackRF scan")
+ logger.info("No SDR detected but hackrf_sweep found, attempting HackRF scan")
if not sweep_tool_path:
logger.warning("No supported sweep tool found (rtl_power or hackrf_sweep)")
@@ -1059,14 +1059,14 @@ def _scan_rf_signals(
# Parse the CSV output (same format for both rtl_power and hackrf_sweep)
if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 0:
- with open(tmp_path, 'r') as f:
+ with open(tmp_path) as f:
for line in f:
parts = line.strip().split(',')
if len(parts) >= 7:
try:
# CSV format: date, time, hz_low, hz_high, hz_step, samples, db_values...
hz_low = int(parts[2].strip())
- hz_high = int(parts[3].strip())
+ int(parts[3].strip())
hz_step = float(parts[4].strip())
db_values = [float(x) for x in parts[6:] if x.strip()]
@@ -1100,10 +1100,8 @@ def _scan_rf_signals(
finally:
# Cleanup temp file
- try:
+ with contextlib.suppress(OSError):
os.unlink(tmp_path)
- except OSError:
- pass
# Deduplicate nearby frequencies (within 100kHz)
if signals:
@@ -1816,9 +1814,11 @@ def _generate_assessment(summary: dict) -> str:
# =============================================================================
# Import sub-modules to register routes on tscm_bp
# =============================================================================
-from routes.tscm import sweep # noqa: E402, F401
-from routes.tscm import baseline # noqa: E402, F401
-from routes.tscm import cases # noqa: E402, F401
-from routes.tscm import meeting # noqa: E402, F401
-from routes.tscm import analysis # noqa: E402, F401
-from routes.tscm import schedules # noqa: E402, F401
+from routes.tscm import (
+ analysis, # noqa: E402, F401
+ baseline, # noqa: E402, F401
+ cases, # noqa: E402, F401
+ meeting, # noqa: E402, F401
+ schedules, # noqa: E402, F401
+ sweep, # noqa: E402, F401
+)
diff --git a/routes/tscm/analysis.py b/routes/tscm/analysis.py
index bca27d3..3dca547 100644
--- a/routes/tscm/analysis.py
+++ b/routes/tscm/analysis.py
@@ -14,7 +14,6 @@ from datetime import datetime
from flask import Response, jsonify, request
from routes.tscm import (
- _current_sweep_id,
_generate_assessment,
tscm_bp,
)
@@ -253,9 +252,9 @@ def get_pdf_report():
summary, and mandatory disclaimers.
"""
try:
- from utils.tscm.reports import generate_report, get_pdf_report
- from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager
from routes.tscm import _current_sweep_id
+ from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager
+ from utils.tscm.reports import generate_report, get_pdf_report
sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int)
if not sweep_id:
@@ -306,9 +305,9 @@ def get_technical_annex():
for audit purposes. No packet data included.
"""
try:
- from utils.tscm.reports import generate_report, get_json_annex, get_csv_annex
- from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager
from routes.tscm import _current_sweep_id
+ from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager
+ from utils.tscm.reports import generate_report, get_csv_annex, get_json_annex
sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int)
format_type = request.args.get('format', 'json')
@@ -900,8 +899,8 @@ def get_device_timeline_endpoint(identifier: str):
and meeting window correlation.
"""
try:
- from utils.tscm.advanced import get_timeline_manager
from utils.database import get_device_timeline
+ from utils.tscm.advanced import get_timeline_manager
protocol = request.args.get('protocol', 'bluetooth')
since_hours = request.args.get('since_hours', 24, type=int)
diff --git a/routes/tscm/baseline.py b/routes/tscm/baseline.py
index d5c5104..4680b09 100644
--- a/routes/tscm/baseline.py
+++ b/routes/tscm/baseline.py
@@ -25,7 +25,6 @@ from utils.database import (
set_active_tscm_baseline,
)
from utils.tscm.baseline import (
- BaselineComparator,
get_comparison_for_active_baseline,
)
@@ -213,7 +212,6 @@ def get_baseline_diff(baseline_id: int, sweep_id: int):
def get_baseline_health(baseline_id: int):
"""Get health assessment for a baseline."""
try:
- from utils.tscm.advanced import BaselineHealth
baseline = get_tscm_baseline(baseline_id)
if not baseline:
diff --git a/routes/tscm/meeting.py b/routes/tscm/meeting.py
index 7bccf8e..04ae3bd 100644
--- a/routes/tscm/meeting.py
+++ b/routes/tscm/meeting.py
@@ -91,7 +91,6 @@ def start_tracked_meeting():
"""
from utils.database import start_meeting_window
from utils.tscm.advanced import get_timeline_manager
- from routes.tscm import _current_sweep_id
data = request.get_json() or {}
@@ -156,9 +155,9 @@ def end_tracked_meeting(meeting_id: int):
def get_meeting_summary_endpoint(meeting_id: int):
"""Get detailed summary of device activity during a meeting."""
try:
+ from routes.tscm import _current_sweep_id
from utils.database import get_meeting_windows
from utils.tscm.advanced import generate_meeting_summary, get_timeline_manager
- from routes.tscm import _current_sweep_id
# Get meeting window
windows = get_meeting_windows(_current_sweep_id or 0)
@@ -194,7 +193,6 @@ def get_meeting_summary_endpoint(meeting_id: int):
def get_active_meeting():
"""Get currently active meeting window."""
from utils.database import get_active_meeting_window
- from routes.tscm import _current_sweep_id
meeting = get_active_meeting_window(_current_sweep_id)
diff --git a/routes/tscm/schedules.py b/routes/tscm/schedules.py
index c29267a..44dc3c2 100644
--- a/routes/tscm/schedules.py
+++ b/routes/tscm/schedules.py
@@ -16,7 +16,6 @@ from routes.tscm import (
_get_schedule_timezone,
_next_run_from_cron,
_start_sweep_internal,
- _sweep_running,
tscm_bp,
)
from utils.database import (
diff --git a/routes/tscm/sweep.py b/routes/tscm/sweep.py
index 88a8c91..480f109 100644
--- a/routes/tscm/sweep.py
+++ b/routes/tscm/sweep.py
@@ -7,27 +7,25 @@ Handles /sweep/*, /status, /devices, /presets/*, /feed/*,
from __future__ import annotations
-import json
import logging
import os
import platform
import re
-import shutil
import subprocess
from typing import Any
from flask import Response, jsonify, request
+from data.tscm_frequencies import get_all_sweep_presets, get_sweep_preset
from routes.tscm import (
+ _baseline_recorder,
_current_sweep_id,
_emit_event,
_start_sweep_internal,
_sweep_running,
tscm_bp,
tscm_queue,
- _baseline_recorder,
)
-from data.tscm_frequencies import get_all_sweep_presets, get_sweep_preset
from utils.database import get_tscm_sweep, update_tscm_sweep
from utils.event_pipeline import process_event
from utils.sse import sse_stream_fanout
@@ -38,7 +36,6 @@ logger = logging.getLogger('intercept.tscm')
@tscm_bp.route('/status')
def tscm_status():
"""Check if any TSCM operation is currently running."""
- from routes.tscm import _sweep_running
return jsonify({'running': _sweep_running})
@@ -98,7 +95,6 @@ def stop_sweep():
@tscm_bp.route('/sweep/status')
def sweep_status():
"""Get current sweep status."""
- from routes.tscm import _sweep_running, _current_sweep_id
status = {
'running': _sweep_running,
@@ -116,7 +112,6 @@ def sweep_status():
@tscm_bp.route('/sweep/stream')
def sweep_stream():
"""SSE stream for real-time sweep updates."""
- from routes.tscm import tscm_queue
def _on_msg(msg: dict[str, Any]) -> None:
process_event('tscm', msg, msg.get('type'))
@@ -218,7 +213,7 @@ def get_tscm_devices():
capture_output=True, text=True, timeout=5
)
blocks = re.split(r'(?=^hci\d+:)', result.stdout, flags=re.MULTILINE)
- for idx, block in enumerate(blocks):
+ for _idx, block in enumerate(blocks):
if block.strip():
first_line = block.split('\n')[0]
match = re.match(r'(hci\d+):', first_line)
@@ -353,7 +348,6 @@ def get_preset(preset_name: str):
@tscm_bp.route('/feed/wifi', methods=['POST'])
def feed_wifi():
"""Feed WiFi device data for baseline recording."""
- from routes.tscm import _baseline_recorder
data = request.get_json()
if data:
@@ -367,7 +361,6 @@ def feed_wifi():
@tscm_bp.route('/feed/bluetooth', methods=['POST'])
def feed_bluetooth():
"""Feed Bluetooth device data for baseline recording."""
- from routes.tscm import _baseline_recorder
data = request.get_json()
if data:
@@ -378,7 +371,6 @@ def feed_bluetooth():
@tscm_bp.route('/feed/rf', methods=['POST'])
def feed_rf():
"""Feed RF signal data for baseline recording."""
- from routes.tscm import _baseline_recorder
data = request.get_json()
if data:
diff --git a/routes/updater.py b/routes/updater.py
index 285471a..a8f9e33 100644
--- a/routes/updater.py
+++ b/routes/updater.py
@@ -4,8 +4,8 @@ from __future__ import annotations
from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
from utils.logging import get_logger
+from utils.responses import api_error
from utils.updater import (
check_for_updates,
dismiss_update,
diff --git a/routes/vdl2.py b/routes/vdl2.py
index d6dd0f5..8499732 100644
--- a/routes/vdl2.py
+++ b/routes/vdl2.py
@@ -2,7 +2,7 @@
from __future__ import annotations
-import io
+import contextlib
import json
import os
import platform
@@ -13,12 +13,11 @@ import subprocess
import threading
import time
from datetime import datetime
-from typing import Any, Generator
+from typing import Any
from flask import Blueprint, Response, jsonify, request
import app as app_module
-from utils.responses import api_success, api_error
from utils.acars_translator import translate_message
from utils.constants import (
PROCESS_START_WAIT,
@@ -30,6 +29,7 @@ from utils.event_pipeline import process_event
from utils.flight_correlator import get_flight_correlator
from utils.logging import sensor_logger as logger
from utils.process import register_process, unregister_process
+from utils.responses import api_error
from utils.sdr import SDRFactory, SDRType
from utils.sse import sse_stream_fanout
from utils.validation import validate_device_index, validate_gain, validate_ppm
@@ -105,10 +105,8 @@ def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) ->
app_module.vdl2_queue.put(data)
# Feed flight correlator
- try:
+ with contextlib.suppress(Exception):
get_flight_correlator().add_vdl2_message(data)
- except Exception:
- pass
# Log if enabled
if app_module.logging_enabled:
@@ -134,10 +132,8 @@ def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) ->
process.terminate()
process.wait(timeout=2)
except Exception:
- try:
+ with contextlib.suppress(Exception):
process.kill()
- except Exception:
- pass
unregister_process(process)
app_module.vdl2_queue.put({'type': 'status', 'status': 'stopped'})
with app_module.vdl2_lock:
@@ -275,7 +271,7 @@ def start_vdl2() -> Response:
)
os.close(slave_fd)
# Wrap master_fd as a text file for line-buffered reading
- process.stdout = io.open(master_fd, 'r', buffering=1)
+ process.stdout = open(master_fd, buffering=1)
is_text_mode = True
else:
process = subprocess.Popen(
diff --git a/routes/waterfall_websocket.py b/routes/waterfall_websocket.py
index 99ceab3..991804f 100644
--- a/routes/waterfall_websocket.py
+++ b/routes/waterfall_websocket.py
@@ -372,7 +372,6 @@ def init_waterfall_websocket(app: Flask):
capture_center_mhz = 0.0
capture_start_freq = 0.0
capture_end_freq = 0.0
- capture_span_mhz = 0.0
# Queue for outgoing messages — only the main loop touches ws.send()
send_queue = queue.Queue(maxsize=120)
@@ -619,7 +618,6 @@ def init_waterfall_websocket(app: Flask):
capture_center_mhz = center_freq_mhz
capture_start_freq = start_freq
capture_end_freq = end_freq
- capture_span_mhz = effective_span_mhz
my_generation = _set_shared_capture_state(
running=True,
diff --git a/routes/weather_sat.py b/routes/weather_sat.py
index b3085b4..290b883 100644
--- a/routes/weather_sat.py
+++ b/routes/weather_sat.py
@@ -8,18 +8,26 @@ from __future__ import annotations
import queue
-from flask import Blueprint, jsonify, request, Response, send_file
+from flask import Blueprint, Response, jsonify, request, send_file
-from utils.responses import api_success, api_error
from utils.logging import get_logger
+from utils.responses import api_error
from utils.sse import sse_stream
-from utils.validation import validate_device_index, validate_gain, validate_latitude, validate_longitude, validate_elevation, validate_rtl_tcp_host, validate_rtl_tcp_port
+from utils.validation import (
+ validate_device_index,
+ validate_elevation,
+ validate_gain,
+ validate_latitude,
+ validate_longitude,
+ validate_rtl_tcp_host,
+ validate_rtl_tcp_port,
+)
from utils.weather_sat import (
+ DEFAULT_SAMPLE_RATE,
+ WEATHER_SATELLITES,
+ CaptureProgress,
get_weather_sat_decoder,
is_weather_sat_available,
- CaptureProgress,
- WEATHER_SATELLITES,
- DEFAULT_SAMPLE_RATE,
)
logger = get_logger('intercept.weather_sat')
@@ -613,7 +621,7 @@ def enable_schedule():
gain=gain_val,
bias_t=bool(data.get('bias_t', False)),
)
- except Exception as e:
+ except Exception:
logger.exception("Failed to enable weather sat scheduler")
return jsonify({
'status': 'error',
diff --git a/routes/websdr.py b/routes/websdr.py
index a7e762b..bc27a02 100644
--- a/routes/websdr.py
+++ b/routes/websdr.py
@@ -9,11 +9,10 @@ import re
import struct
import threading
import time
-from typing import Optional
-from flask import Blueprint, Flask, jsonify, request, Response
+from flask import Blueprint, Flask, Response, jsonify, request
-from utils.responses import api_success, api_error
+from utils.responses import api_error, api_success
try:
from flask_sock import Sock
@@ -21,7 +20,9 @@ try:
except ImportError:
WEBSOCKET_AVAILABLE = False
-from utils.kiwisdr import KiwiSDRClient, KIWI_SAMPLE_RATE, VALID_MODES, parse_host_port
+import contextlib
+
+from utils.kiwisdr import KIWI_SAMPLE_RATE, VALID_MODES, KiwiSDRClient, parse_host_port
from utils.logging import get_logger
logger = get_logger('intercept.websdr')
@@ -38,7 +39,7 @@ _cache_timestamp: float = 0
CACHE_TTL = 3600 # 1 hour
-def _parse_gps_coord(coord_str: str) -> Optional[float]:
+def _parse_gps_coord(coord_str: str) -> float | None:
"""Parse a GPS coordinate string like '51.5074' or '(-33.87)' into a float."""
if not coord_str:
return None
@@ -70,8 +71,8 @@ KIWI_DATA_URLS = [
def _fetch_kiwi_receivers() -> list[dict]:
"""Fetch the KiwiSDR receiver list from the public directory."""
- import urllib.request
import json
+ import urllib.request
receivers = []
raw = None
@@ -335,7 +336,7 @@ def websdr_status() -> Response:
# KIWISDR AUDIO PROXY
# ============================================
-_kiwi_client: Optional[KiwiSDRClient] = None
+_kiwi_client: KiwiSDRClient | None = None
_kiwi_lock = threading.Lock()
_kiwi_audio_queue: queue.Queue = queue.Queue(maxsize=200)
@@ -387,26 +388,18 @@ def _handle_kiwi_command(ws, cmd: str, data: dict) -> None:
try:
_kiwi_audio_queue.put_nowait(header + pcm_bytes)
except queue.Full:
- try:
+ with contextlib.suppress(queue.Empty):
_kiwi_audio_queue.get_nowait()
- except queue.Empty:
- pass
- try:
+ with contextlib.suppress(queue.Full):
_kiwi_audio_queue.put_nowait(header + pcm_bytes)
- except queue.Full:
- pass
def on_error(msg):
- try:
+ with contextlib.suppress(Exception):
ws.send(json.dumps({'type': 'error', 'message': msg}))
- except Exception:
- pass
def on_disconnect():
- try:
+ with contextlib.suppress(Exception):
ws.send(json.dumps({'type': 'disconnected'}))
- except Exception:
- pass
with _kiwi_lock:
_kiwi_client = KiwiSDRClient(
diff --git a/routes/wefax.py b/routes/wefax.py
index f534e85..150a958 100644
--- a/routes/wefax.py
+++ b/routes/wefax.py
@@ -6,13 +6,14 @@ maritime/aviation weather services worldwide.
from __future__ import annotations
+import contextlib
import queue
from flask import Blueprint, Response, jsonify, request, send_file
-from utils.responses import api_success, api_error
import app as app_module
from utils.logging import get_logger
+from utils.responses import api_error
from utils.sdr import SDRType
from utils.sse import sse_stream_fanout
from utils.validation import validate_frequency
@@ -129,10 +130,8 @@ def start_decoder():
frequency_reference = str(data.get('frequency_reference', 'auto')).strip().lower()
sdr_type_str = str(data.get('sdr_type', 'rtlsdr')).lower()
- try:
- sdr_type = SDRType(sdr_type_str)
- except ValueError:
- sdr_type = SDRType.RTL_SDR
+ with contextlib.suppress(ValueError):
+ SDRType(sdr_type_str)
if not frequency_reference:
frequency_reference = 'auto'
diff --git a/routes/wifi.py b/routes/wifi.py
index 61fbbbb..c36ee8c 100644
--- a/routes/wifi.py
+++ b/routes/wifi.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import contextlib
import fcntl
import json
import os
@@ -11,39 +12,25 @@ import re
import subprocess
import threading
import time
-from typing import Any, Generator
+from typing import Any
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.responses import api_success, api_error
import app as app_module
-from utils.dependencies import check_tool, get_tool_path
-from utils.logging import wifi_logger as logger
-from utils.process import is_valid_mac, is_valid_channel
-from utils.validation import validate_wifi_channel, validate_mac_address, validate_network_interface
-from utils.sse import format_sse, sse_stream_fanout
-from utils.event_pipeline import process_event
from data.oui import get_manufacturer
from utils.constants import (
- WIFI_TERMINATE_TIMEOUT,
- PMKID_TERMINATE_TIMEOUT,
SSE_KEEPALIVE_INTERVAL,
SSE_QUEUE_TIMEOUT,
- WIFI_CSV_PARSE_INTERVAL,
- WIFI_CSV_TIMEOUT_WARNING,
- SUBPROCESS_TIMEOUT_SHORT,
SUBPROCESS_TIMEOUT_MEDIUM,
- SUBPROCESS_TIMEOUT_LONG,
- DEAUTH_TIMEOUT,
- MIN_DEAUTH_COUNT,
- MAX_DEAUTH_COUNT,
- DEFAULT_DEAUTH_COUNT,
- PROCESS_START_WAIT,
- MONITOR_MODE_DELAY,
- WIFI_CAPTURE_PATH_PREFIX,
- HANDSHAKE_CAPTURE_PATH_PREFIX,
- PMKID_CAPTURE_PATH_PREFIX,
+ SUBPROCESS_TIMEOUT_SHORT,
)
+from utils.dependencies import check_tool, get_tool_path
+from utils.event_pipeline import process_event
+from utils.logging import wifi_logger as logger
+from utils.process import is_valid_channel, is_valid_mac
+from utils.responses import api_error, api_success
+from utils.sse import format_sse, sse_stream_fanout
+from utils.validation import validate_network_interface, validate_wifi_channel
wifi_bp = Blueprint('wifi', __name__, url_prefix='/wifi')
@@ -201,9 +188,9 @@ def _get_interface_details(iface_name):
# Get MAC address
try:
mac_path = f'/sys/class/net/{iface_name}/address'
- with open(mac_path, 'r') as f:
+ with open(mac_path) as f:
details['mac'] = f.read().strip().upper()
- except (FileNotFoundError, IOError):
+ except (OSError, FileNotFoundError):
pass
# Get driver name
@@ -212,7 +199,7 @@ def _get_interface_details(iface_name):
if os.path.islink(driver_link):
driver_path = os.readlink(driver_link)
details['driver'] = os.path.basename(driver_path)
- except (FileNotFoundError, IOError, OSError):
+ except (FileNotFoundError, OSError):
pass
# Try airmon-ng first for chipset info (most reliable for WiFi adapters)
@@ -230,11 +217,10 @@ def _get_interface_details(iface_name):
break
# Also try space-separated format
parts = line.split()
- if len(parts) >= 4:
- if parts[1] == iface_name or parts[1].startswith(iface_name):
- details['driver'] = parts[2]
- details['chipset'] = ' '.join(parts[3:])
- break
+ if len(parts) >= 4 and (parts[1] == iface_name or parts[1].startswith(iface_name)):
+ details['driver'] = parts[2]
+ details['chipset'] = ' '.join(parts[3:])
+ break
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
@@ -246,10 +232,10 @@ def _get_interface_details(iface_name):
# Try to get USB product name
for usb_path in [f'{device_path}/product', f'{device_path}/../product']:
try:
- with open(usb_path, 'r') as f:
+ with open(usb_path) as f:
details['chipset'] = f.read().strip()
break
- except (FileNotFoundError, IOError):
+ except (OSError, FileNotFoundError):
pass
# If no USB product, try lsusb for USB devices
@@ -257,7 +243,7 @@ def _get_interface_details(iface_name):
try:
# Get USB bus/device info
uevent_path = f'{device_path}/uevent'
- with open(uevent_path, 'r') as f:
+ with open(uevent_path) as f:
for line in f:
if line.startswith('PRODUCT='):
# PRODUCT format: vendor/product/bcdDevice
@@ -280,9 +266,9 @@ def _get_interface_details(iface_name):
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
break
- except (FileNotFoundError, IOError):
+ except (OSError, FileNotFoundError):
pass
- except (FileNotFoundError, IOError, OSError):
+ except (FileNotFoundError, OSError):
pass
return details
@@ -294,7 +280,7 @@ def parse_airodump_csv(csv_path):
clients = {}
try:
- with open(csv_path, 'r', errors='replace') as f:
+ with open(csv_path, errors='replace') as f:
content = f.read()
sections = content.split('\n\n')
@@ -602,7 +588,6 @@ def toggle_monitor_mode():
return api_success(data={'monitor_interface': app_module.wifi_monitor_interface})
except Exception as e:
- import traceback
logger.error(f"Error enabling monitor mode: {e}", exc_info=True)
return api_error(str(e))
@@ -683,11 +668,9 @@ def start_wifi_scan():
csv_path = '/tmp/intercept_wifi'
- for f in [f'/tmp/intercept_wifi-01.csv', f'/tmp/intercept_wifi-01.cap']:
- try:
+ for f in ['/tmp/intercept_wifi-01.csv', '/tmp/intercept_wifi-01.cap']:
+ with contextlib.suppress(OSError):
os.remove(f)
- except OSError:
- pass
airodump_path = get_tool_path('airodump-ng')
cmd = [
@@ -1021,7 +1004,7 @@ def check_pmkid_status():
try:
hash_file = capture_file.replace('.pcapng', '.22000')
- result = subprocess.run(
+ subprocess.run(
['hcxpcapngtool', '-o', hash_file, capture_file],
capture_output=True, text=True, timeout=10
)
@@ -1170,7 +1153,7 @@ def stream_wifi():
# V2 API Endpoints - Using unified WiFi scanner
# =============================================================================
-from utils.wifi.scanner import get_wifi_scanner, reset_wifi_scanner
+from utils.wifi.scanner import get_wifi_scanner
@wifi_bp.route('/v2/capabilities')
diff --git a/routes/wifi_v2.py b/routes/wifi_v2.py
index 9ab04bc..dc3193f 100644
--- a/routes/wifi_v2.py
+++ b/routes/wifi_v2.py
@@ -7,26 +7,26 @@ channel analysis, hidden SSID correlation, and SSE streaming.
from __future__ import annotations
+import contextlib
import csv
import io
import json
import logging
+from collections.abc import Generator
from datetime import datetime
-from typing import Generator
-from flask import Blueprint, jsonify, request, Response
+from flask import Blueprint, Response, jsonify, request
-from utils.wifi import (
- get_wifi_scanner,
- analyze_channels,
- get_hidden_correlator,
- SCAN_MODE_QUICK,
- SCAN_MODE_DEEP,
-)
-from utils.responses import api_success, api_error
+from utils.event_pipeline import process_event
+from utils.responses import api_error
from utils.sse import format_sse
from utils.validation import validate_wifi_channel
-from utils.event_pipeline import process_event
+from utils.wifi import (
+ SCAN_MODE_DEEP,
+ analyze_channels,
+ get_hidden_correlator,
+ get_wifi_scanner,
+)
logger = logging.getLogger(__name__)
@@ -407,10 +407,8 @@ def event_stream():
scanner = get_wifi_scanner()
for event in scanner.get_event_stream():
- try:
+ with contextlib.suppress(Exception):
process_event('wifi', event, event.get('type'))
- except Exception:
- pass
yield format_sse(event)
response = Response(generate(), mimetype='text/event-stream')
diff --git a/templates/adsb_dashboard.html b/templates/adsb_dashboard.html
index 0c98734..229adc8 100644
--- a/templates/adsb_dashboard.html
+++ b/templates/adsb_dashboard.html
@@ -51,7 +51,7 @@