#!/usr/bin/env python3
"""
INTERCEPT - Signal Intelligence Platform
A comprehensive signal intelligence tool featuring:
- Pager decoding (POCSAG/FLEX)
- 433MHz sensor monitoring
- ADS-B aircraft tracking with WarGames-style display
- Satellite pass prediction and Iridium burst detection
- WiFi reconnaissance and drone detection
- Bluetooth scanning
Requires RTL-SDR hardware for RF modes.
"""
import sys
print(f"[Debug] Python executable: {sys.executable}")
print(f"[Debug] Python version: {sys.version}")
print(f"[Debug] User site-packages: {'/home/radio/.local/lib/python3.10/site-packages' in sys.path}")
# Quick skyfield check at startup
try:
import skyfield
print(f"[Debug] Skyfield loaded: {skyfield.__file__}")
except Exception as e:
print(f"[Debug] Skyfield import failed: {e}")
import subprocess
import shutil
import re
import threading
import queue
import pty
import os
import select
import json
import time
from flask import Flask, render_template_string, jsonify, request, Response, send_file
app = Flask(__name__)
def load_oui_database():
"""Load OUI database from external JSON file, with fallback to built-in."""
oui_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'oui_database.json')
try:
if os.path.exists(oui_file):
with open(oui_file, 'r') as f:
data = json.load(f)
# Remove comment fields
return {k: v for k, v in data.items() if not k.startswith('_')}
except Exception as e:
print(f"[OUI] Error loading oui_database.json: {e}, using built-in database")
return None # Will fall back to built-in
# Global process management
current_process = None
sensor_process = None
wifi_process = None
bt_process = None
output_queue = queue.Queue()
sensor_queue = queue.Queue()
wifi_queue = queue.Queue()
bt_queue = queue.Queue()
process_lock = threading.Lock()
sensor_lock = threading.Lock()
wifi_lock = threading.Lock()
bt_lock = threading.Lock()
# Logging settings
logging_enabled = False
log_file_path = 'pager_messages.log'
# WiFi state
wifi_monitor_interface = None
wifi_networks = {} # BSSID -> network info
wifi_clients = {} # Client MAC -> client info
wifi_handshakes = [] # Captured handshakes
# Bluetooth state
bt_interface = None
bt_devices = {} # MAC -> device info
bt_beacons = {} # MAC -> beacon info (AirTags, Tiles, iBeacons)
bt_services = {} # MAC -> list of services
# Aircraft (ADS-B) state
adsb_process = None
adsb_queue = queue.Queue()
adsb_lock = threading.Lock()
adsb_aircraft = {} # ICAO hex -> aircraft info
# Satellite state
satellite_process = None
satellite_queue = queue.Queue()
satellite_lock = threading.Lock()
iridium_bursts = [] # List of detected Iridium bursts
satellite_passes = [] # Predicted satellite passes
# TLE data for satellite tracking (updated periodically)
TLE_SATELLITES = {
'ISS': ('ISS (ZARYA)',
'1 25544U 98067A 24001.00000000 .00000000 00000-0 00000-0 0 0000',
'2 25544 51.6400 0.0000 0000000 0.0000 0.0000 15.50000000000000'),
'NOAA-15': ('NOAA 15',
'1 25338U 98030A 24001.00000000 .00000-0 00000-0 00000-0 0 0000',
'2 25338 98.7300 0.0000 0010000 0.0000 0.0000 14.26000000000000'),
'NOAA-18': ('NOAA 18',
'1 28654U 05018A 24001.00000000 .00000-0 00000-0 00000-0 0 0000',
'2 28654 98.8800 0.0000 0014000 0.0000 0.0000 14.12000000000000'),
'NOAA-19': ('NOAA 19',
'1 33591U 09005A 24001.00000000 .00000-0 00000-0 00000-0 0 0000',
'2 33591 99.1900 0.0000 0014000 0.0000 0.0000 14.12000000000000'),
'METEOR-M2': ('METEOR-M 2',
'1 40069U 14037A 24001.00000000 .00000-0 00000-0 00000-0 0 0000',
'2 40069 98.5400 0.0000 0005000 0.0000 0.0000 14.21000000000000'),
}
# Known beacon prefixes for detection
AIRTAG_PREFIXES = ['4C:00'] # Apple continuity
TILE_PREFIXES = ['C4:E7', 'DC:54', 'E4:B0', 'F8:8A']
SAMSUNG_TRACKER = ['58:4D', 'A0:75']
# Drone detection patterns
DRONE_SSID_PATTERNS = [
# DJI
'DJI-', 'DJI_', 'Mavic', 'Phantom', 'Spark-', 'Mini-', 'Air-', 'Inspire',
'Matrice', 'Avata', 'FPV-', 'Osmo', 'RoboMaster', 'Tello',
# Parrot
'Parrot', 'Bebop', 'Anafi', 'Disco-', 'Mambo', 'Swing',
# Autel
'Autel', 'EVO-', 'Dragonfish', 'Lite+', 'Nano',
# Skydio
'Skydio',
# Other brands
'Holy Stone', 'Potensic', 'SYMA', 'Hubsan', 'Eachine', 'FIMI',
'Xiaomi_FIMI', 'Yuneec', 'Typhoon', 'PowerVision', 'PowerEgg',
# Generic drone patterns
'Drone', 'UAV-', 'Quadcopter', 'FPV_', 'RC-Drone'
]
# Drone OUI prefixes (MAC address prefixes for drone manufacturers)
DRONE_OUI_PREFIXES = {
# DJI
'60:60:1F': 'DJI', '48:1C:B9': 'DJI', '34:D2:62': 'DJI', 'E0:DB:55': 'DJI',
'C8:6C:87': 'DJI', 'A0:14:3D': 'DJI', '70:D7:11': 'DJI', '98:3A:56': 'DJI',
# Parrot
'90:03:B7': 'Parrot', 'A0:14:3D': 'Parrot', '00:12:1C': 'Parrot', '00:26:7E': 'Parrot',
# Autel
'8C:F5:A3': 'Autel', 'D8:E0:E1': 'Autel',
# Yuneec
'60:60:1F': 'Yuneec',
# Skydio
'F8:0F:6F': 'Skydio',
}
# OUI Database for manufacturer lookup (expanded)
OUI_DATABASE = {
# Apple (extensive list)
'00:25:DB': 'Apple', '04:52:F3': 'Apple', '0C:3E:9F': 'Apple', '10:94:BB': 'Apple',
'14:99:E2': 'Apple', '20:78:F0': 'Apple', '28:6A:BA': 'Apple', '3C:22:FB': 'Apple',
'40:98:AD': 'Apple', '48:D7:05': 'Apple', '4C:57:CA': 'Apple', '54:4E:90': 'Apple',
'5C:97:F3': 'Apple', '60:F8:1D': 'Apple', '68:DB:CA': 'Apple', '70:56:81': 'Apple',
'78:7B:8A': 'Apple', '7C:D1:C3': 'Apple', '84:FC:FE': 'Apple', '8C:2D:AA': 'Apple',
'90:B0:ED': 'Apple', '98:01:A7': 'Apple', '98:D6:BB': 'Apple', 'A4:D1:D2': 'Apple',
'AC:BC:32': 'Apple', 'B0:34:95': 'Apple', 'B8:C1:11': 'Apple', 'C8:69:CD': 'Apple',
'D0:03:4B': 'Apple', 'DC:A9:04': 'Apple', 'E0:C7:67': 'Apple', 'F0:18:98': 'Apple',
'F4:5C:89': 'Apple', '78:4F:43': 'Apple', '00:CD:FE': 'Apple', '04:4B:ED': 'Apple',
'04:D3:CF': 'Apple', '08:66:98': 'Apple', '0C:74:C2': 'Apple', '10:DD:B1': 'Apple',
'14:10:9F': 'Apple', '18:EE:69': 'Apple', '1C:36:BB': 'Apple', '24:A0:74': 'Apple',
'28:37:37': 'Apple', '2C:BE:08': 'Apple', '34:08:BC': 'Apple', '38:C9:86': 'Apple',
'3C:06:30': 'Apple', '44:D8:84': 'Apple', '48:A9:1C': 'Apple', '4C:32:75': 'Apple',
'50:32:37': 'Apple', '54:26:96': 'Apple', '58:B0:35': 'Apple', '5C:F7:E6': 'Apple',
'64:A3:CB': 'Apple', '68:FE:F7': 'Apple', '6C:4D:73': 'Apple', '70:DE:E2': 'Apple',
'74:E2:F5': 'Apple', '78:67:D7': 'Apple', '7C:04:D0': 'Apple', '80:E6:50': 'Apple',
'84:78:8B': 'Apple', '88:66:A5': 'Apple', '8C:85:90': 'Apple', '94:E9:6A': 'Apple',
'9C:F4:8E': 'Apple', 'A0:99:9B': 'Apple', 'A4:83:E7': 'Apple', 'A8:5C:2C': 'Apple',
'AC:1F:74': 'Apple', 'B0:19:C6': 'Apple', 'B4:F1:DA': 'Apple', 'BC:52:B7': 'Apple',
'C0:A5:3E': 'Apple', 'C4:B3:01': 'Apple', 'CC:20:E8': 'Apple', 'D0:C5:F3': 'Apple',
'D4:61:9D': 'Apple', 'D8:1C:79': 'Apple', 'E0:5F:45': 'Apple', 'E4:C6:3D': 'Apple',
'F0:B4:79': 'Apple', 'F4:0F:24': 'Apple', 'F8:4D:89': 'Apple', 'FC:D8:48': 'Apple',
# Samsung
'00:1B:66': 'Samsung', '00:21:19': 'Samsung', '00:26:37': 'Samsung', '5C:0A:5B': 'Samsung',
'8C:71:F8': 'Samsung', 'C4:73:1E': 'Samsung', '38:2C:4A': 'Samsung', '00:1E:4C': 'Samsung',
'00:12:47': 'Samsung', '00:15:99': 'Samsung', '00:17:D5': 'Samsung', '00:1D:F6': 'Samsung',
'00:21:D1': 'Samsung', '00:24:54': 'Samsung', '00:26:5D': 'Samsung', '08:D4:2B': 'Samsung',
'10:D5:42': 'Samsung', '14:49:E0': 'Samsung', '18:3A:2D': 'Samsung', '1C:66:AA': 'Samsung',
'24:4B:81': 'Samsung', '28:98:7B': 'Samsung', '2C:AE:2B': 'Samsung', '30:96:FB': 'Samsung',
'34:C3:AC': 'Samsung', '38:01:95': 'Samsung', '3C:5A:37': 'Samsung', '40:0E:85': 'Samsung',
'44:4E:1A': 'Samsung', '4C:BC:A5': 'Samsung', '50:01:BB': 'Samsung', '50:A4:D0': 'Samsung',
'54:88:0E': 'Samsung', '58:C3:8B': 'Samsung', '5C:2E:59': 'Samsung', '60:D0:A9': 'Samsung',
'64:B3:10': 'Samsung', '68:48:98': 'Samsung', '6C:2F:2C': 'Samsung', '70:F9:27': 'Samsung',
'74:45:8A': 'Samsung', '78:47:1D': 'Samsung', '7C:0B:C6': 'Samsung', '84:11:9E': 'Samsung',
'88:32:9B': 'Samsung', '8C:77:12': 'Samsung', '90:18:7C': 'Samsung', '94:35:0A': 'Samsung',
'98:52:B1': 'Samsung', '9C:02:98': 'Samsung', 'A0:0B:BA': 'Samsung', 'A4:7B:85': 'Samsung',
'A8:06:00': 'Samsung', 'AC:5F:3E': 'Samsung', 'B0:72:BF': 'Samsung', 'B4:79:A7': 'Samsung',
'BC:44:86': 'Samsung', 'C0:97:27': 'Samsung', 'C4:42:02': 'Samsung', 'CC:07:AB': 'Samsung',
'D0:22:BE': 'Samsung', 'D4:87:D8': 'Samsung', 'D8:90:E8': 'Samsung', 'E4:7C:F9': 'Samsung',
'E8:50:8B': 'Samsung', 'F0:25:B7': 'Samsung', 'F4:7B:5E': 'Samsung', 'FC:A1:3E': 'Samsung',
# Google
'54:60:09': 'Google', '00:1A:11': 'Google', 'F4:F5:D8': 'Google', '94:EB:2C': 'Google',
'64:B5:C6': 'Google', '3C:5A:B4': 'Google', 'F8:8F:CA': 'Google', '20:DF:B9': 'Google',
'54:27:1E': 'Google', '58:CB:52': 'Google', 'A4:77:33': 'Google', 'F4:0E:22': 'Google',
# Sony
'00:13:A9': 'Sony', '00:1D:28': 'Sony', '00:24:BE': 'Sony', '04:5D:4B': 'Sony',
'08:A9:5A': 'Sony', '10:4F:A8': 'Sony', '24:21:AB': 'Sony', '30:52:CB': 'Sony',
'40:B8:37': 'Sony', '58:48:22': 'Sony', '70:9E:29': 'Sony', '84:00:D2': 'Sony',
'AC:9B:0A': 'Sony', 'B4:52:7D': 'Sony', 'BC:60:A7': 'Sony', 'FC:0F:E6': 'Sony',
# Bose
'00:0C:8A': 'Bose', '04:52:C7': 'Bose', '08:DF:1F': 'Bose', '2C:41:A1': 'Bose',
'4C:87:5D': 'Bose', '60:AB:D2': 'Bose', '88:C9:E8': 'Bose', 'D8:9C:67': 'Bose',
# JBL/Harman
'00:1D:DF': 'JBL', '08:AE:D6': 'JBL', '20:3C:AE': 'JBL', '44:5E:F3': 'JBL',
'50:C9:71': 'JBL', '74:5E:1C': 'JBL', '88:C6:26': 'JBL', 'AC:12:2F': 'JBL',
# Beats (Apple subsidiary)
'00:61:71': 'Beats', '48:D6:D5': 'Beats', '9C:64:8B': 'Beats', 'A4:E9:75': 'Beats',
# Jabra/GN Audio
'00:13:17': 'Jabra', '1C:48:F9': 'Jabra', '50:C2:ED': 'Jabra', '70:BF:92': 'Jabra',
'74:5C:4B': 'Jabra', '94:16:25': 'Jabra', 'D0:81:7A': 'Jabra', 'E8:EE:CC': 'Jabra',
# Sennheiser
'00:1B:66': 'Sennheiser', '00:22:27': 'Sennheiser', 'B8:AD:3E': 'Sennheiser',
# Xiaomi
'04:CF:8C': 'Xiaomi', '0C:1D:AF': 'Xiaomi', '10:2A:B3': 'Xiaomi', '18:59:36': 'Xiaomi',
'20:47:DA': 'Xiaomi', '28:6C:07': 'Xiaomi', '34:CE:00': 'Xiaomi', '38:A4:ED': 'Xiaomi',
'44:23:7C': 'Xiaomi', '50:64:2B': 'Xiaomi', '58:44:98': 'Xiaomi', '64:09:80': 'Xiaomi',
'74:23:44': 'Xiaomi', '78:02:F8': 'Xiaomi', '7C:1C:4E': 'Xiaomi', '84:F3:EB': 'Xiaomi',
'8C:BE:BE': 'Xiaomi', '98:FA:E3': 'Xiaomi', 'A4:77:58': 'Xiaomi', 'AC:C1:EE': 'Xiaomi',
'B0:E2:35': 'Xiaomi', 'C4:0B:CB': 'Xiaomi', 'C8:47:8C': 'Xiaomi', 'D4:97:0B': 'Xiaomi',
'E4:46:DA': 'Xiaomi', 'F0:B4:29': 'Xiaomi', 'FC:64:BA': 'Xiaomi',
# Huawei
'00:18:82': 'Huawei', '00:1E:10': 'Huawei', '00:25:68': 'Huawei', '04:B0:E7': 'Huawei',
'08:63:61': 'Huawei', '10:1B:54': 'Huawei', '18:DE:D7': 'Huawei', '20:A6:80': 'Huawei',
'28:31:52': 'Huawei', '34:12:98': 'Huawei', '3C:47:11': 'Huawei', '48:00:31': 'Huawei',
'4C:50:77': 'Huawei', '5C:7D:5E': 'Huawei', '60:DE:44': 'Huawei', '70:72:3C': 'Huawei',
'78:F5:57': 'Huawei', '80:B6:86': 'Huawei', '88:53:D4': 'Huawei', '94:04:9C': 'Huawei',
'A4:99:47': 'Huawei', 'B4:15:13': 'Huawei', 'BC:76:70': 'Huawei', 'C8:D1:5E': 'Huawei',
'DC:D2:FC': 'Huawei', 'E4:68:A3': 'Huawei', 'F4:63:1F': 'Huawei',
# OnePlus/BBK
'64:A2:F9': 'OnePlus', 'C0:EE:FB': 'OnePlus', '94:65:2D': 'OnePlus',
# Fitbit
'2C:09:4D': 'Fitbit', 'C4:D9:87': 'Fitbit', 'E4:88:6D': 'Fitbit',
# Garmin
'00:1C:D1': 'Garmin', 'C4:AC:59': 'Garmin', 'E8:0F:C8': 'Garmin',
# Microsoft
'00:50:F2': 'Microsoft', '28:18:78': 'Microsoft', '60:45:BD': 'Microsoft',
'7C:1E:52': 'Microsoft', '98:5F:D3': 'Microsoft', 'B4:0E:DE': 'Microsoft',
# Intel
'00:1B:21': 'Intel', '00:1C:C0': 'Intel', '00:1E:64': 'Intel', '00:21:5C': 'Intel',
'08:D4:0C': 'Intel', '18:1D:EA': 'Intel', '34:02:86': 'Intel', '40:74:E0': 'Intel',
'48:51:B7': 'Intel', '58:A0:23': 'Intel', '64:D4:DA': 'Intel', '80:19:34': 'Intel',
'8C:8D:28': 'Intel', 'A4:4E:31': 'Intel', 'B4:6B:FC': 'Intel', 'C8:D0:83': 'Intel',
# Qualcomm/Atheros
'00:03:7F': 'Qualcomm', '00:24:E4': 'Qualcomm', '04:F0:21': 'Qualcomm',
'1C:4B:D6': 'Qualcomm', '88:71:B1': 'Qualcomm', 'A0:65:18': 'Qualcomm',
# Broadcom
'00:10:18': 'Broadcom', '00:1A:2B': 'Broadcom', '20:10:7A': 'Broadcom',
# Realtek
'00:0A:EB': 'Realtek', '00:E0:4C': 'Realtek', '48:02:2A': 'Realtek',
'52:54:00': 'Realtek', '80:EA:96': 'Realtek',
# Logitech
'00:1F:20': 'Logitech', '34:88:5D': 'Logitech', '6C:B7:49': 'Logitech',
# Lenovo
'00:09:2D': 'Lenovo', '28:D2:44': 'Lenovo', '54:EE:75': 'Lenovo', '98:FA:9B': 'Lenovo',
# Dell
'00:14:22': 'Dell', '00:1A:A0': 'Dell', '18:DB:F2': 'Dell', '34:17:EB': 'Dell',
'78:2B:CB': 'Dell', 'A4:BA:DB': 'Dell', 'E4:B9:7A': 'Dell',
# HP
'00:0F:61': 'HP', '00:14:C2': 'HP', '10:1F:74': 'HP', '28:80:23': 'HP',
'38:63:BB': 'HP', '5C:B9:01': 'HP', '80:CE:62': 'HP', 'A0:D3:C1': 'HP',
# Tile
'F8:E4:E3': 'Tile', 'C4:E7:BE': 'Tile', 'DC:54:D7': 'Tile', 'E4:B0:21': 'Tile',
# Raspberry Pi
'B8:27:EB': 'Raspberry Pi', 'DC:A6:32': 'Raspberry Pi', 'E4:5F:01': 'Raspberry Pi',
# Amazon
'00:FC:8B': 'Amazon', '10:CE:A9': 'Amazon', '34:D2:70': 'Amazon', '40:B4:CD': 'Amazon',
'44:65:0D': 'Amazon', '68:54:FD': 'Amazon', '74:C2:46': 'Amazon', '84:D6:D0': 'Amazon',
'A0:02:DC': 'Amazon', 'AC:63:BE': 'Amazon', 'B4:7C:9C': 'Amazon', 'FC:65:DE': 'Amazon',
# Skullcandy
'00:01:00': 'Skullcandy', '88:E6:03': 'Skullcandy',
# Bang & Olufsen
'00:21:3E': 'Bang & Olufsen', '78:C5:E5': 'Bang & Olufsen',
# Audio-Technica
'A0:E9:DB': 'Audio-Technica', 'EC:81:93': 'Audio-Technica',
# Plantronics/Poly
'00:1D:DF': 'Plantronics', 'B0:B4:48': 'Plantronics', 'E8:FC:AF': 'Plantronics',
# Anker
'AC:89:95': 'Anker', 'E8:AB:FA': 'Anker',
# Misc/Generic
'00:00:0A': 'Omron', '00:1A:7D': 'Cyber-Blue', '00:1E:3D': 'Alps Electric',
'00:0B:57': 'Silicon Wave', '00:02:72': 'CC&C',
}
# Try to load from external file (easier to update)
_external_oui = load_oui_database()
if _external_oui:
OUI_DATABASE = _external_oui
print(f"[OUI] Loaded {len(OUI_DATABASE)} entries from oui_database.json")
else:
print(f"[OUI] Using built-in database with {len(OUI_DATABASE)} entries")
HTML_TEMPLATE = '''
INTERCEPT // Signal Intelligence
⚠️
DISCLAIMER
INTERCEPT is a signal intelligence tool designed for educational purposes only .
By using this software, you acknowledge and agree that:
This tool is intended for use by cyber security professionals and researchers only
You will only use this software in a controlled environment with proper authorization
Intercepting communications without consent may be illegal in your jurisdiction
You are solely responsible for ensuring compliance with all applicable laws and regulations
The developers assume no liability for misuse of this software
Only proceed if you understand and accept these terms.
I UNDERSTAND & ACCEPT
DECLINE
█████╗ ██████╗ ██████╗███████╗███████╗███████╗
██╔══██╗██╔════╝██╔════╝██╔════╝██╔════╝██╔════╝
███████║██║ ██║ █████╗ ███████╗███████╗
██╔══██║██║ ██║ ██╔══╝ ╚════██║╚════██║
██║ ██║╚██████╗╚██████╗███████╗███████║███████║
╚═╝ ╚═╝ ╚═════╝ ╚═════╝╚══════╝╚══════╝╚══════╝
██████╗ ███████╗███╗ ██╗██╗███████╗██████╗
██╔══██╗██╔════╝████╗ ██║██║██╔════╝██╔══██╗
██║ ██║█████╗ ██╔██╗ ██║██║█████╗ ██║ ██║
██║ ██║██╔══╝ ██║╚██╗██║██║██╔══╝ ██║ ██║
██████╔╝███████╗██║ ╚████║██║███████╗██████╔╝
╚═════╝ ╚══════╝╚═╝ ╚═══╝╚═╝╚══════╝╚═════╝
root@intercepted: ~# sudo access --grant-permission
[sudo] password for user: ********
Error: User is not in the sudoers file.
This incident will be reported.
"In a world of locked doors, the man with the key is king.
And you, my friend, just threw away the key."
TRY AGAIN
Channel Utilization (2.4 GHz)
Channel Utilization (5 GHz)
Target Signal
No target selected
-- dBm
💡 Channel Recommendation
2.4 GHz: Use channel --
5 GHz: Use channel --
🔗 Device Correlation
Analyzing WiFi/BT device patterns...
👁️ Hidden SSIDs Revealed
Monitoring probe requests...
📡 Client Probe Analysis
Clients: 0
Unique SSIDs: 0
Privacy Leaks: 0
Waiting for client probe requests...
Bluetooth Proximity Radar
N = North |
Center = Overhead (90°) |
Edge = Horizon (0°)
--- Past |
● Current |
― Future |
◉ Observer
--
Visibility
--:--
Duration
Calculate passes to see countdown
Click "Calculate Passes" to predict satellite passes for your location.
Iridium bursts will appear here when detected.
N = North |
Center = Overhead (90°) |
Edge = Horizon (0°)
--
Visibility
--:--
Duration
Waiting for pass data...
Paste TLE
Celestrak
Paste TLE data (3 lines per satellite: name, line 1, line 2)
Add Satellites from TLE
Select a category to fetch satellites from Celestrak
🚀 Space Stations
👁️ Brightest
🌤️ Weather
📡 NOAA
📻 Amateur Radio
⭐ Starlink
🛰️ GPS
📱 Iridium
Device intelligence data will appear here as signals are intercepted.
Configure settings and click "Start Decoding" to begin.
RECON
🔊 MUTE
⬇ AUTO-SCROLL ON
📄 CSV
📋 JSON
🔍 INTEL
Clear
×
📡 INTERCEPT Help
Icons
Modes
WiFi
Tips
Stats Bar Icons
📟 POCSAG messages decoded
📠 FLEX messages decoded
📨 Total messages received
🌡️ Unique sensors detected
📊 Device types found
✈️ Aircraft being tracked
🛰️ Satellites monitored
📡 WiFi Access Points
👤 Connected WiFi clients
🤝 Captured handshakes
🚁 Detected drones (click for details)
⚠️ Rogue APs (click for details)
🔵 Bluetooth devices
📍 BLE beacons detected
Mode Tab Icons
📟 Pager - POCSAG/FLEX decoder
📡 433MHz - Sensor decoder
✈️ Aircraft - ADS-B tracker
🛰️ Satellite - Pass prediction
📶 WiFi - Network scanner
🔵 Bluetooth - BT/BLE scanner
📟 Pager Mode
Decodes POCSAG and FLEX pager signals using RTL-SDR
Set frequency to local pager frequencies (common: 152-158 MHz)
Messages are displayed in real-time as they're decoded
Use presets for common pager frequencies
📡 433MHz Sensor Mode
Decodes wireless sensors on 433.92 MHz ISM band
Detects temperature, humidity, weather stations, tire pressure monitors
Supports many common protocols (Acurite, LaCrosse, Oregon Scientific, etc.)
Device intelligence builds profiles of recurring devices
✈️ Aircraft Mode
Tracks aircraft via ADS-B using dump1090 or rtl_adsb
Interactive map with real OpenStreetMap tiles
Click aircraft markers to see callsign, altitude, speed, heading
Map auto-fits to show all tracked aircraft
Emergency squawk codes highlighted in red
🛰️ Satellite Mode
Track satellites using TLE (Two-Line Element) data
Add satellites manually or fetch from Celestrak by category
Categories: Amateur, Weather, ISS, Starlink, GPS, and more
View next pass predictions with elevation and duration
Monitor for Iridium satellite bursts
📶 WiFi Mode
Requires a WiFi adapter capable of monitor mode
Click "Enable Monitor" to put adapter in monitor mode
Scans all channels or lock to a specific channel
Detects drones by SSID patterns and manufacturer OUI
Rogue AP detection flags same SSID on multiple BSSIDs
Click network rows to target for deauth or handshake capture
🔵 Bluetooth Mode
Scans for classic Bluetooth and BLE devices
Shows device names, addresses, and signal strength
Manufacturer lookup from MAC address OUI
Radar visualization shows device proximity
Monitor Mode
Enable Monitor: Puts WiFi adapter in monitor mode for passive scanning
Kill Processes: Optional - stops NetworkManager/wpa_supplicant (may drop other connections)
Some adapters rename when entering monitor mode (e.g., wlan0 → wlan0mon)
Handshake Capture
Click "Capture" on a network to start targeted handshake capture
Status panel shows capture progress and file location
Use deauth to force clients to reconnect (only on authorized networks!)
Handshake files saved to /tmp/intercept_handshake_*.cap
Drone Detection
Drones detected by SSID patterns (DJI, Parrot, Autel, etc.)
Also detected by manufacturer OUI in MAC address
Distance estimated from signal strength (approximate)
Click drone count in stats bar to see all detected drones
Rogue AP Detection
Flags networks where same SSID appears on multiple BSSIDs
Could indicate evil twin attack or legitimate multi-AP setup
Click rogue count to see which SSIDs are flagged
Proximity Alerts
Add MAC addresses to watch list for alerts when detected
Watch list persists in browser localStorage
Useful for tracking specific devices
Client Probe Analysis
Shows what networks client devices are looking for
Orange highlights indicate sensitive/private network names
Reveals user location history (home, work, hotels, airports)
Useful for security awareness and pen test reports
General Tips
Collapsible sections: Click any section header (▼) to collapse/expand
Sound alerts: Toggle sound on/off in settings for each mode
Export data: Use export buttons to save captured data as JSON
Device Intelligence: Tracks device patterns over time
Theme toggle: Click 🌙/☀️ button in header to switch dark/light mode
Keyboard Shortcuts
F1 - Open this help page
? - Open help (when not typing in a field)
Escape - Close help and modal dialogs
Requirements
Pager/433MHz: RTL-SDR dongle, rtl_fm, multimon-ng, rtl_433
Aircraft: RTL-SDR dongle, dump1090 or rtl_adsb
Satellite: Internet connection for Celestrak (optional)
WiFi: Monitor-mode capable adapter, aircrack-ng suite
Bluetooth: Bluetooth adapter, hcitool/bluetoothctl
Run as root/sudo for full functionality
Legal Notice
Only use on networks and devices you own or have authorization to test
Passive monitoring may be legal; active attacks require authorization
Check local laws regarding radio frequency monitoring
×
🔧 Tool Dependencies
Check which tools are installed for each mode. ● = Installed, ● = Missing
Quick Install (Debian/Ubuntu)
sudo apt install rtl-sdr multimon-ng rtl-433 aircrack-ng bluez dump1090-mutability hcxtools
pip install skyfield flask
'''
def check_tool(name):
"""Check if a tool is installed."""
return shutil.which(name) is not None
# Comprehensive tool dependency definitions
TOOL_DEPENDENCIES = {
'pager': {
'name': 'Pager Decoding',
'tools': {
'rtl_fm': {
'required': True,
'description': 'RTL-SDR FM demodulator',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
},
'multimon-ng': {
'required': True,
'description': 'Digital transmission decoder',
'install': {
'apt': 'sudo apt install multimon-ng',
'brew': 'brew install multimon-ng',
'manual': 'https://github.com/EliasOewornal/multimon-ng'
}
},
'rtl_test': {
'required': False,
'description': 'RTL-SDR device detection',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
}
}
},
'sensor': {
'name': '433MHz Sensors',
'tools': {
'rtl_433': {
'required': True,
'description': 'ISM band decoder for sensors, weather stations, TPMS',
'install': {
'apt': 'sudo apt install rtl-433',
'brew': 'brew install rtl_433',
'manual': 'https://github.com/merbanan/rtl_433'
}
}
}
},
'wifi': {
'name': 'WiFi Reconnaissance',
'tools': {
'airmon-ng': {
'required': True,
'description': 'Monitor mode controller',
'install': {
'apt': 'sudo apt install aircrack-ng',
'brew': 'Not available on macOS',
'manual': 'https://aircrack-ng.org'
}
},
'airodump-ng': {
'required': True,
'description': 'WiFi network scanner',
'install': {
'apt': 'sudo apt install aircrack-ng',
'brew': 'Not available on macOS',
'manual': 'https://aircrack-ng.org'
}
},
'aireplay-ng': {
'required': False,
'description': 'Deauthentication / packet injection',
'install': {
'apt': 'sudo apt install aircrack-ng',
'brew': 'Not available on macOS',
'manual': 'https://aircrack-ng.org'
}
},
'aircrack-ng': {
'required': False,
'description': 'Handshake verification',
'install': {
'apt': 'sudo apt install aircrack-ng',
'brew': 'brew install aircrack-ng',
'manual': 'https://aircrack-ng.org'
}
},
'hcxdumptool': {
'required': False,
'description': 'PMKID capture tool',
'install': {
'apt': 'sudo apt install hcxdumptool',
'brew': 'brew install hcxtools',
'manual': 'https://github.com/ZerBea/hcxdumptool'
}
},
'hcxpcapngtool': {
'required': False,
'description': 'PMKID hash extractor',
'install': {
'apt': 'sudo apt install hcxtools',
'brew': 'brew install hcxtools',
'manual': 'https://github.com/ZerBea/hcxtools'
}
}
}
},
'bluetooth': {
'name': 'Bluetooth Scanning',
'tools': {
'hcitool': {
'required': False,
'description': 'Bluetooth HCI tool (legacy)',
'install': {
'apt': 'sudo apt install bluez',
'brew': 'Not available on macOS (use native)',
'manual': 'http://www.bluez.org'
}
},
'bluetoothctl': {
'required': True,
'description': 'Modern Bluetooth controller',
'install': {
'apt': 'sudo apt install bluez',
'brew': 'Not available on macOS (use native)',
'manual': 'http://www.bluez.org'
}
},
'hciconfig': {
'required': False,
'description': 'Bluetooth adapter configuration',
'install': {
'apt': 'sudo apt install bluez',
'brew': 'Not available on macOS',
'manual': 'http://www.bluez.org'
}
}
}
},
'aircraft': {
'name': 'Aircraft Tracking (ADS-B)',
'tools': {
'dump1090': {
'required': False,
'description': 'Mode S / ADS-B decoder (preferred)',
'install': {
'apt': 'sudo apt install dump1090-mutability',
'brew': 'brew install dump1090-mutability',
'manual': 'https://github.com/flightaware/dump1090'
},
'alternatives': ['dump1090-mutability', 'dump1090-fa']
},
'rtl_adsb': {
'required': False,
'description': 'Simple ADS-B decoder',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
}
}
},
'satellite': {
'name': 'Satellite Tracking',
'tools': {
'skyfield': {
'required': True,
'description': 'Python orbital mechanics library',
'install': {
'pip': 'pip install skyfield',
'manual': 'https://rhodesmill.org/skyfield/'
},
'python_module': True
}
}
},
'iridium': {
'name': 'Iridium Monitoring',
'tools': {
'iridium-extractor': {
'required': False,
'description': 'Iridium burst extractor',
'install': {
'manual': 'https://github.com/muccc/gr-iridium'
}
}
}
}
}
def check_all_dependencies():
"""Check all tool dependencies and return status."""
results = {}
for mode, config in TOOL_DEPENDENCIES.items():
mode_result = {
'name': config['name'],
'tools': {},
'ready': True,
'missing_required': []
}
for tool, tool_config in config['tools'].items():
# Check if it's a Python module
if tool_config.get('python_module'):
try:
__import__(tool)
installed = True
except Exception as e:
print(f"[Dependency] Failed to import {tool}: {type(e).__name__}: {e}")
installed = False
else:
# Check for alternatives
alternatives = tool_config.get('alternatives', [])
installed = check_tool(tool) or any(check_tool(alt) for alt in alternatives)
mode_result['tools'][tool] = {
'installed': installed,
'required': tool_config['required'],
'description': tool_config['description'],
'install': tool_config['install']
}
if tool_config['required'] and not installed:
mode_result['ready'] = False
mode_result['missing_required'].append(tool)
results[mode] = mode_result
return results
@app.route('/dependencies')
def get_dependencies():
"""Get status of all tool dependencies."""
import platform
results = check_all_dependencies()
# Determine OS for install instructions
system = platform.system().lower()
if system == 'darwin':
pkg_manager = 'brew'
elif system == 'linux':
pkg_manager = 'apt'
else:
pkg_manager = 'manual'
return jsonify({
'status': 'success',
'os': system,
'pkg_manager': pkg_manager,
'modes': results
})
def is_valid_mac(mac):
"""Validate MAC address format."""
import re
if not mac:
return False
return bool(re.match(r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$', mac))
def is_valid_channel(channel):
"""Validate WiFi channel number."""
try:
ch = int(channel)
return 1 <= ch <= 200
except (ValueError, TypeError):
return False
def detect_devices():
"""Detect RTL-SDR devices."""
devices = []
if not check_tool('rtl_test'):
return devices
try:
result = subprocess.run(
['rtl_test', '-t'],
capture_output=True,
text=True,
timeout=5
)
output = result.stderr + result.stdout
# Parse device info
device_pattern = r'(\d+):\s+(.+?)(?:,\s*SN:\s*(\S+))?$'
for line in output.split('\n'):
line = line.strip()
match = re.match(device_pattern, line)
if match:
devices.append({
'index': int(match.group(1)),
'name': match.group(2).strip().rstrip(','),
'serial': match.group(3) or 'N/A'
})
if not devices:
found_match = re.search(r'Found (\d+) device', output)
if found_match:
count = int(found_match.group(1))
for i in range(count):
devices.append({
'index': i,
'name': f'RTL-SDR Device {i}',
'serial': 'Unknown'
})
except Exception:
pass
return devices
def parse_multimon_output(line):
"""Parse multimon-ng output line."""
# POCSAG formats:
# POCSAG512: Address: 1234567 Function: 0 Alpha: Message here
# POCSAG1200: Address: 1234567 Function: 0 Numeric: 123-456-7890
# POCSAG2400: Address: 1234567 Function: 0 (no message)
# FLEX formats:
# FLEX: NNNN-NN-NN NN:NN:NN NNNN/NN/C NN.NNN [NNNNNNN] ALN Message here
# FLEX|NNNN-NN-NN|NN:NN:NN|NNNN/NN/C|NN.NNN|NNNNNNN|ALN|Message
line = line.strip()
# POCSAG parsing - with message content
pocsag_match = re.match(
r'(POCSAG\d+):\s*Address:\s*(\d+)\s+Function:\s*(\d+)\s+(Alpha|Numeric):\s*(.*)',
line
)
if pocsag_match:
return {
'protocol': pocsag_match.group(1),
'address': pocsag_match.group(2),
'function': pocsag_match.group(3),
'msg_type': pocsag_match.group(4),
'message': pocsag_match.group(5).strip() or '[No Message]'
}
# POCSAG parsing - address only (no message content)
pocsag_addr_match = re.match(
r'(POCSAG\d+):\s*Address:\s*(\d+)\s+Function:\s*(\d+)\s*$',
line
)
if pocsag_addr_match:
return {
'protocol': pocsag_addr_match.group(1),
'address': pocsag_addr_match.group(2),
'function': pocsag_addr_match.group(3),
'msg_type': 'Tone',
'message': '[Tone Only]'
}
# FLEX parsing (standard format)
flex_match = re.match(
r'FLEX[:\|]\s*[\d\-]+[\s\|]+[\d:]+[\s\|]+([\d/A-Z]+)[\s\|]+([\d.]+)[\s\|]+\[?(\d+)\]?[\s\|]+(\w+)[\s\|]+(.*)',
line
)
if flex_match:
return {
'protocol': 'FLEX',
'address': flex_match.group(3),
'function': flex_match.group(1),
'msg_type': flex_match.group(4),
'message': flex_match.group(5).strip() or '[No Message]'
}
# Simple FLEX format
flex_simple = re.match(r'FLEX:\s*(.+)', line)
if flex_simple:
return {
'protocol': 'FLEX',
'address': 'Unknown',
'function': '',
'msg_type': 'Unknown',
'message': flex_simple.group(1).strip()
}
return None
def stream_decoder(master_fd, process):
"""Stream decoder output to queue using PTY for unbuffered output."""
global current_process
try:
output_queue.put({'type': 'status', 'text': 'started'})
buffer = ""
while True:
try:
ready, _, _ = select.select([master_fd], [], [], 1.0)
except Exception:
break
if ready:
try:
data = os.read(master_fd, 1024)
if not data:
break
buffer += data.decode('utf-8', errors='replace')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line:
continue
parsed = parse_multimon_output(line)
if parsed:
from datetime import datetime
parsed['timestamp'] = datetime.now().strftime('%H:%M:%S')
output_queue.put({'type': 'message', **parsed})
log_message(parsed)
else:
output_queue.put({'type': 'raw', 'text': line})
except OSError:
break
if process.poll() is not None:
break
except Exception as e:
output_queue.put({'type': 'error', 'text': str(e)})
finally:
try:
os.close(master_fd)
except:
pass
process.wait()
output_queue.put({'type': 'status', 'text': 'stopped'})
with process_lock:
current_process = None
@app.route('/')
def index():
tools = {
'rtl_fm': check_tool('rtl_fm'),
'multimon': check_tool('multimon-ng'),
'rtl_433': check_tool('rtl_433')
}
devices = detect_devices()
return render_template_string(HTML_TEMPLATE, tools=tools, devices=devices)
@app.route('/favicon.svg')
def favicon():
return send_file('favicon.svg', mimetype='image/svg+xml')
@app.route('/devices')
def get_devices():
return jsonify(detect_devices())
@app.route('/start', methods=['POST'])
def start_decoding():
global current_process
with process_lock:
if current_process:
return jsonify({'status': 'error', 'message': 'Already running'})
data = request.json
freq = data.get('frequency', '929.6125')
gain = data.get('gain', '0')
squelch = data.get('squelch', '0')
ppm = data.get('ppm', '0')
device = data.get('device', '0')
protocols = data.get('protocols', ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX'])
# Clear queue
while not output_queue.empty():
try:
output_queue.get_nowait()
except:
break
# Build multimon-ng decoder arguments
decoders = []
for proto in protocols:
if proto == 'POCSAG512':
decoders.extend(['-a', 'POCSAG512'])
elif proto == 'POCSAG1200':
decoders.extend(['-a', 'POCSAG1200'])
elif proto == 'POCSAG2400':
decoders.extend(['-a', 'POCSAG2400'])
elif proto == 'FLEX':
decoders.extend(['-a', 'FLEX'])
# Build rtl_fm command
# rtl_fm -d -f M -M fm -s 22050 -g -p -l - | multimon-ng -t raw -a POCSAG512 -a POCSAG1200 -a FLEX -f alpha -
rtl_cmd = [
'rtl_fm',
'-d', str(device),
'-f', f'{freq}M',
'-M', 'fm',
'-s', '22050',
]
if gain and gain != '0':
rtl_cmd.extend(['-g', str(gain)])
if ppm and ppm != '0':
rtl_cmd.extend(['-p', str(ppm)])
if squelch and squelch != '0':
rtl_cmd.extend(['-l', str(squelch)])
rtl_cmd.append('-')
multimon_cmd = ['multimon-ng', '-t', 'raw'] + decoders + ['-f', 'alpha', '-']
# Log the command being run
full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd)
print(f"Running: {full_cmd}")
try:
# Create pipe: rtl_fm | multimon-ng
# Use PTY for multimon-ng to get unbuffered output
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Start a thread to monitor rtl_fm stderr for errors
def monitor_rtl_stderr():
for line in rtl_process.stderr:
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
print(f"[RTL_FM] {err_text}", flush=True)
output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'})
rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr)
rtl_stderr_thread.daemon = True
rtl_stderr_thread.start()
# Create a pseudo-terminal for multimon-ng output
# This tricks it into thinking it's connected to a terminal,
# which disables output buffering
master_fd, slave_fd = pty.openpty()
multimon_process = subprocess.Popen(
multimon_cmd,
stdin=rtl_process.stdout,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd) # Close slave fd in parent process
rtl_process.stdout.close() # Allow rtl_process to receive SIGPIPE
current_process = multimon_process
current_process._rtl_process = rtl_process # Store reference to kill later
current_process._master_fd = master_fd # Store for cleanup
# Start output thread with PTY master fd
thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process))
thread.daemon = True
thread.start()
# Send the command info to the client
output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError as e:
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/stop', methods=['POST'])
def stop_decoding():
global current_process
with process_lock:
if current_process:
# Kill rtl_fm process first
if hasattr(current_process, '_rtl_process'):
try:
current_process._rtl_process.terminate()
current_process._rtl_process.wait(timeout=2)
except:
try:
current_process._rtl_process.kill()
except:
pass
# Close PTY master fd
if hasattr(current_process, '_master_fd'):
try:
os.close(current_process._master_fd)
except:
pass
# Kill multimon-ng
current_process.terminate()
try:
current_process.wait(timeout=2)
except subprocess.TimeoutExpired:
current_process.kill()
current_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/status')
def get_status():
"""Check if decoder is currently running."""
with process_lock:
if current_process and current_process.poll() is None:
return jsonify({'running': True, 'logging': logging_enabled, 'log_file': log_file_path})
return jsonify({'running': False, 'logging': logging_enabled, 'log_file': log_file_path})
@app.route('/logging', methods=['POST'])
def toggle_logging():
"""Toggle message logging."""
global logging_enabled, log_file_path
data = request.json
if 'enabled' in data:
logging_enabled = data['enabled']
if 'log_file' in data and data['log_file']:
log_file_path = data['log_file']
return jsonify({'logging': logging_enabled, 'log_file': log_file_path})
def log_message(msg):
"""Log a message to file if logging is enabled."""
if not logging_enabled:
return
try:
with open(log_file_path, 'a') as f:
from datetime import datetime
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {msg.get('protocol', 'UNKNOWN')} | {msg.get('address', '')} | {msg.get('message', '')}\n")
except Exception as e:
print(f"[ERROR] Failed to log message: {e}", flush=True)
@app.route('/killall', methods=['POST'])
def kill_all():
"""Kill all decoder and WiFi processes."""
global current_process, sensor_process, wifi_process
killed = []
processes_to_kill = [
'rtl_fm', 'multimon-ng', 'rtl_433',
'airodump-ng', 'aireplay-ng', 'airmon-ng'
]
for proc in processes_to_kill:
try:
result = subprocess.run(['pkill', '-f', proc], capture_output=True)
if result.returncode == 0:
killed.append(proc)
except:
pass
with process_lock:
current_process = None
with sensor_lock:
sensor_process = None
with wifi_lock:
wifi_process = None
return jsonify({'status': 'killed', 'processes': killed})
@app.route('/stream')
def stream():
def generate():
import json
while True:
try:
msg = output_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============== RTL_433 SENSOR ROUTES ==============
def stream_sensor_output(process):
"""Stream rtl_433 JSON output to queue."""
global sensor_process
import json as json_module
try:
sensor_queue.put({'type': 'status', 'text': 'started'})
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
try:
# rtl_433 outputs JSON objects, one per line
data = json_module.loads(line)
data['type'] = 'sensor'
sensor_queue.put(data)
# Log if enabled
if logging_enabled:
try:
with open(log_file_path, 'a') as f:
from datetime import datetime
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {data.get('model', 'Unknown')} | {json_module.dumps(data)}\n")
except Exception:
pass
except json_module.JSONDecodeError:
# Not JSON, send as raw
sensor_queue.put({'type': 'raw', 'text': line})
except Exception as e:
sensor_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
sensor_queue.put({'type': 'status', 'text': 'stopped'})
with sensor_lock:
sensor_process = None
@app.route('/start_sensor', methods=['POST'])
def start_sensor():
global sensor_process
with sensor_lock:
if sensor_process:
return jsonify({'status': 'error', 'message': 'Sensor already running'})
data = request.json
freq = data.get('frequency', '433.92')
gain = data.get('gain', '0')
ppm = data.get('ppm', '0')
device = data.get('device', '0')
# Clear queue
while not sensor_queue.empty():
try:
sensor_queue.get_nowait()
except:
break
# Build rtl_433 command
# rtl_433 -d -f M -g -p -F json
cmd = [
'rtl_433',
'-d', str(device),
'-f', f'{freq}M',
'-F', 'json'
]
if gain and gain != '0':
cmd.extend(['-g', str(gain)])
if ppm and ppm != '0':
cmd.extend(['-p', str(ppm)])
full_cmd = ' '.join(cmd)
print(f"Running: {full_cmd}")
try:
sensor_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1
)
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(sensor_process,))
thread.daemon = True
thread.start()
# Monitor stderr
def monitor_stderr():
for line in sensor_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err:
print(f"[rtl_433] {err}")
sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
stderr_thread = threading.Thread(target=monitor_stderr)
stderr_thread.daemon = True
stderr_thread.start()
sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/stop_sensor', methods=['POST'])
def stop_sensor():
global sensor_process
with sensor_lock:
if sensor_process:
sensor_process.terminate()
try:
sensor_process.wait(timeout=2)
except subprocess.TimeoutExpired:
sensor_process.kill()
sensor_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/stream_sensor')
def stream_sensor():
def generate():
import json
while True:
try:
msg = sensor_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============== WIFI RECONNAISSANCE ROUTES ==============
def detect_wifi_interfaces():
"""Detect available WiFi interfaces."""
interfaces = []
import platform
if platform.system() == 'Darwin': # macOS
try:
# Get list of network interfaces
result = subprocess.run(['networksetup', '-listallhardwareports'],
capture_output=True, text=True, timeout=5)
lines = result.stdout.split('\n')
current_device = None
for i, line in enumerate(lines):
if 'Wi-Fi' in line or 'AirPort' in line:
# Next line should have the device
for j in range(i+1, min(i+3, len(lines))):
if 'Device:' in lines[j]:
device = lines[j].split('Device:')[1].strip()
interfaces.append({
'name': device,
'type': 'internal',
'monitor_capable': False, # macOS internal usually can't
'status': 'up'
})
break
except Exception as e:
print(f"[WiFi] Error detecting macOS interfaces: {e}")
# Check for USB WiFi adapters
try:
result = subprocess.run(['system_profiler', 'SPUSBDataType'],
capture_output=True, text=True, timeout=10)
if 'Wireless' in result.stdout or 'WLAN' in result.stdout or '802.11' in result.stdout:
interfaces.append({
'name': 'USB WiFi Adapter',
'type': 'usb',
'monitor_capable': True,
'status': 'detected'
})
except Exception:
pass
else: # Linux
try:
# Use iw to list wireless interfaces
result = subprocess.run(['iw', 'dev'], capture_output=True, text=True, timeout=5)
current_iface = None
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('Interface'):
current_iface = line.split()[1]
elif current_iface and 'type' in line:
iface_type = line.split()[-1]
interfaces.append({
'name': current_iface,
'type': iface_type,
'monitor_capable': True,
'status': 'up'
})
current_iface = None
except FileNotFoundError:
# Try iwconfig instead
try:
result = subprocess.run(['iwconfig'], capture_output=True, text=True, timeout=5)
for line in result.stdout.split('\n'):
if 'IEEE 802.11' in line:
iface = line.split()[0]
interfaces.append({
'name': iface,
'type': 'managed',
'monitor_capable': True,
'status': 'up'
})
except Exception:
pass
except Exception as e:
print(f"[WiFi] Error detecting Linux interfaces: {e}")
return interfaces
@app.route('/wifi/interfaces')
def get_wifi_interfaces():
"""Get available WiFi interfaces."""
interfaces = detect_wifi_interfaces()
tools = {
'airmon': check_tool('airmon-ng'),
'airodump': check_tool('airodump-ng'),
'aireplay': check_tool('aireplay-ng'),
'iw': check_tool('iw')
}
return jsonify({'interfaces': interfaces, 'tools': tools, 'monitor_interface': wifi_monitor_interface})
@app.route('/wifi/monitor', methods=['POST'])
def toggle_monitor_mode():
"""Enable or disable monitor mode on an interface."""
global wifi_monitor_interface
data = request.json
interface = data.get('interface')
action = data.get('action', 'start') # 'start' or 'stop'
if not interface:
return jsonify({'status': 'error', 'message': 'No interface specified'})
if action == 'start':
# Try airmon-ng first
if check_tool('airmon-ng'):
try:
import re
# Get list of wireless interfaces BEFORE enabling monitor mode
def get_wireless_interfaces():
"""Get all wireless interface names."""
interfaces = set()
try:
# Try iwconfig first (shows wireless interfaces)
result = subprocess.run(['iwconfig'], capture_output=True, text=True, timeout=5)
for line in result.stdout.split('\n'):
if line and not line.startswith(' ') and 'no wireless' not in line.lower():
iface = line.split()[0] if line.split() else None
if iface:
interfaces.add(iface)
except:
pass
try:
# Also check /sys/class/net for interfaces with wireless dir
import os
for iface in os.listdir('/sys/class/net'):
if os.path.exists(f'/sys/class/net/{iface}/wireless'):
interfaces.add(iface)
except:
pass
try:
# Also try ip link to find any interface
result = subprocess.run(['ip', 'link', 'show'], capture_output=True, text=True, timeout=5)
for match in re.finditer(r'^\d+:\s+(\S+):', result.stdout, re.MULTILINE):
iface = match.group(1).rstrip(':')
# Include interfaces that look like wireless (wl*, wlan*, etc)
if iface.startswith('wl') or 'mon' in iface:
interfaces.add(iface)
except:
pass
return interfaces
interfaces_before = get_wireless_interfaces()
print(f"[WiFi] Interfaces before monitor mode: {interfaces_before}", flush=True)
# Optionally kill interfering processes (can drop other connections)
kill_processes = data.get('kill_processes', False)
if kill_processes:
print("[WiFi] Killing interfering processes...", flush=True)
subprocess.run(['airmon-ng', 'check', 'kill'], capture_output=True, timeout=10)
else:
print("[WiFi] Skipping process kill (other connections preserved)", flush=True)
# Start monitor mode
result = subprocess.run(['airmon-ng', 'start', interface],
capture_output=True, text=True, timeout=15)
output = result.stdout + result.stderr
print(f"[WiFi] airmon-ng output:\n{output}", flush=True)
# Get interfaces AFTER enabling monitor mode
import time
time.sleep(1) # Give system time to register new interface
interfaces_after = get_wireless_interfaces()
print(f"[WiFi] Interfaces after monitor mode: {interfaces_after}", flush=True)
# Find the new interface (the monitor mode one)
new_interfaces = interfaces_after - interfaces_before
print(f"[WiFi] New interfaces detected: {new_interfaces}", flush=True)
# Determine monitor interface
monitor_iface = None
# Method 1: New interface appeared
if new_interfaces:
# Prefer interface with 'mon' in name
for iface in new_interfaces:
if 'mon' in iface:
monitor_iface = iface
break
if not monitor_iface:
monitor_iface = list(new_interfaces)[0]
# Method 2: Parse airmon-ng output
if not monitor_iface:
# Look for various patterns in airmon-ng output
patterns = [
r'monitor mode.*enabled.*on\s+(\S+)', # "monitor mode enabled on wlan0mon"
r'\(monitor mode.*enabled.*?(\S+mon)\)', # "(monitor mode enabled on wlan0mon)"
r'created\s+(\S+mon)', # "created wlan0mon"
r'\bon\s+(\S+mon)\b', # "on wlan0mon"
r'\b(\S+mon)\b.*monitor', # "wlan0mon in monitor"
r'\b(' + re.escape(interface) + r'mon)\b', # exact match: interfacemon
]
for pattern in patterns:
match = re.search(pattern, output, re.IGNORECASE)
if match:
monitor_iface = match.group(1)
print(f"[WiFi] Found monitor interface via pattern '{pattern}': {monitor_iface}", flush=True)
break
# Method 3: Check if original interface now in monitor mode
if not monitor_iface:
# Check if original interface is now in monitor mode
try:
result = subprocess.run(['iwconfig', interface], capture_output=True, text=True, timeout=5)
if 'Mode:Monitor' in result.stdout:
monitor_iface = interface
print(f"[WiFi] Original interface {interface} is now in monitor mode", flush=True)
except:
pass
# Method 4: Check interface + 'mon'
if not monitor_iface:
potential = interface + 'mon'
if potential in interfaces_after:
monitor_iface = potential
# Method 5: Last resort - assume interface + 'mon'
if not monitor_iface:
monitor_iface = interface + 'mon'
print(f"[WiFi] Assuming monitor interface: {monitor_iface}", flush=True)
# Verify the interface actually exists
try:
result = subprocess.run(['ip', 'link', 'show', monitor_iface], capture_output=True, text=True, timeout=5)
if result.returncode != 0:
# Interface doesn't exist - try to find any mon interface
for iface in interfaces_after:
if 'mon' in iface or iface.startswith('wl'):
# Check if it's in monitor mode
try:
check = subprocess.run(['iwconfig', iface], capture_output=True, text=True, timeout=5)
if 'Mode:Monitor' in check.stdout:
monitor_iface = iface
print(f"[WiFi] Found working monitor interface: {monitor_iface}", flush=True)
break
except:
pass
except:
pass
wifi_monitor_interface = monitor_iface
wifi_queue.put({'type': 'info', 'text': f'Monitor mode enabled on {wifi_monitor_interface}'})
return jsonify({'status': 'success', 'monitor_interface': wifi_monitor_interface})
except Exception as e:
import traceback
print(f"[WiFi] Error enabling monitor mode: {e}\n{traceback.format_exc()}", flush=True)
return jsonify({'status': 'error', 'message': str(e)})
# Fallback to iw (Linux)
elif check_tool('iw'):
try:
subprocess.run(['ip', 'link', 'set', interface, 'down'], capture_output=True)
subprocess.run(['iw', interface, 'set', 'monitor', 'control'], capture_output=True)
subprocess.run(['ip', 'link', 'set', interface, 'up'], capture_output=True)
wifi_monitor_interface = interface
return jsonify({'status': 'success', 'monitor_interface': interface})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
else:
return jsonify({'status': 'error', 'message': 'No monitor mode tools available. Install aircrack-ng (brew install aircrack-ng) or iw.'})
else: # stop
if check_tool('airmon-ng'):
try:
result = subprocess.run(['airmon-ng', 'stop', wifi_monitor_interface or interface],
capture_output=True, text=True, timeout=15)
wifi_monitor_interface = None
return jsonify({'status': 'success', 'message': 'Monitor mode disabled'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
elif check_tool('iw'):
try:
subprocess.run(['ip', 'link', 'set', interface, 'down'], capture_output=True)
subprocess.run(['iw', interface, 'set', 'type', 'managed'], capture_output=True)
subprocess.run(['ip', 'link', 'set', interface, 'up'], capture_output=True)
wifi_monitor_interface = None
return jsonify({'status': 'success', 'message': 'Monitor mode disabled'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
return jsonify({'status': 'error', 'message': 'Unknown action'})
def parse_airodump_csv(csv_path):
"""Parse airodump-ng CSV output file."""
networks = {}
clients = {}
try:
with open(csv_path, 'r', errors='replace') as f:
content = f.read()
# Split into networks and clients sections
sections = content.split('\n\n')
for section in sections:
lines = section.strip().split('\n')
if not lines:
continue
header = lines[0] if lines else ''
if 'BSSID' in header and 'ESSID' in header:
# Networks section
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 14:
bssid = parts[0]
if bssid and ':' in bssid:
networks[bssid] = {
'bssid': bssid,
'first_seen': parts[1],
'last_seen': parts[2],
'channel': parts[3],
'speed': parts[4],
'privacy': parts[5],
'cipher': parts[6],
'auth': parts[7],
'power': parts[8],
'beacons': parts[9],
'ivs': parts[10],
'lan_ip': parts[11],
'essid': parts[13] or 'Hidden'
}
elif 'Station MAC' in header:
# Clients section
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 6:
station = parts[0]
if station and ':' in station:
# Lookup vendor from OUI database
vendor = get_manufacturer(station)
clients[station] = {
'mac': station,
'first_seen': parts[1],
'last_seen': parts[2],
'power': parts[3],
'packets': parts[4],
'bssid': parts[5],
'probes': parts[6] if len(parts) > 6 else '',
'vendor': vendor
}
except Exception as e:
print(f"[WiFi] Error parsing CSV: {e}")
return networks, clients
def stream_airodump_output(process, csv_path):
"""Stream airodump-ng output to queue."""
global wifi_process, wifi_networks, wifi_clients
import time
import select
try:
wifi_queue.put({'type': 'status', 'text': 'started'})
last_parse = 0
start_time = time.time()
csv_found = False
while process.poll() is None:
# Check for stderr output (non-blocking)
try:
import fcntl
# Make stderr non-blocking
fd = process.stderr.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stderr_data = process.stderr.read()
if stderr_data:
stderr_text = stderr_data.decode('utf-8', errors='replace').strip()
if stderr_text:
# Filter out progress updates, report actual errors
for line in stderr_text.split('\n'):
line = line.strip()
if line and not line.startswith('CH') and not line.startswith('Elapsed'):
wifi_queue.put({'type': 'error', 'text': f'airodump-ng: {line}'})
except Exception:
pass
# Parse CSV file periodically
current_time = time.time()
if current_time - last_parse >= 2: # Parse every 2 seconds
csv_file = csv_path + '-01.csv'
if os.path.exists(csv_file):
csv_found = True
networks, clients = parse_airodump_csv(csv_file)
# Detect new networks
for bssid, net in networks.items():
if bssid not in wifi_networks:
wifi_queue.put({
'type': 'network',
'action': 'new',
**net
})
else:
# Update existing
wifi_queue.put({
'type': 'network',
'action': 'update',
**net
})
# Detect new clients
for mac, client in clients.items():
if mac not in wifi_clients:
wifi_queue.put({
'type': 'client',
'action': 'new',
**client
})
wifi_networks = networks
wifi_clients = clients
last_parse = current_time
if current_time - start_time > 5 and not csv_found:
# No CSV after 5 seconds - likely a problem
wifi_queue.put({'type': 'error', 'text': 'No scan data after 5 seconds. Check if monitor mode is properly enabled.'})
start_time = current_time + 30 # Don't spam this message
time.sleep(0.5)
# Process exited - capture any remaining stderr
try:
remaining_stderr = process.stderr.read()
if remaining_stderr:
stderr_text = remaining_stderr.decode('utf-8', errors='replace').strip()
if stderr_text:
wifi_queue.put({'type': 'error', 'text': f'airodump-ng exited: {stderr_text}'})
except Exception:
pass
# Check exit code
exit_code = process.returncode
if exit_code != 0 and exit_code is not None:
wifi_queue.put({'type': 'error', 'text': f'airodump-ng exited with code {exit_code}'})
except Exception as e:
wifi_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
wifi_queue.put({'type': 'status', 'text': 'stopped'})
with wifi_lock:
wifi_process = None
@app.route('/wifi/scan/start', methods=['POST'])
def start_wifi_scan():
"""Start WiFi scanning with airodump-ng."""
global wifi_process, wifi_networks, wifi_clients
with wifi_lock:
if wifi_process:
return jsonify({'status': 'error', 'message': 'Scan already running'})
data = request.json
interface = data.get('interface') or wifi_monitor_interface
channel = data.get('channel') # None = channel hopping
band = data.get('band', 'abg') # 'a' = 5GHz, 'bg' = 2.4GHz, 'abg' = both
if not interface:
return jsonify({'status': 'error', 'message': 'No monitor interface available. Enable monitor mode first.'})
# Clear previous data
wifi_networks = {}
wifi_clients = {}
# Clear queue
while not wifi_queue.empty():
try:
wifi_queue.get_nowait()
except:
break
# Build airodump-ng command
csv_path = '/tmp/intercept_wifi'
# Remove old files
for f in [f'/tmp/intercept_wifi-01.csv', f'/tmp/intercept_wifi-01.cap']:
try:
os.remove(f)
except:
pass
cmd = [
'airodump-ng',
'-w', csv_path,
'--output-format', 'csv,pcap',
'--band', band,
interface
]
if channel:
cmd.extend(['-c', str(channel)])
print(f"[WiFi] Running: {' '.join(cmd)}")
try:
wifi_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Wait briefly to check if process fails immediately
import time
time.sleep(0.5)
if wifi_process.poll() is not None:
# Process already exited - capture error
stderr_output = wifi_process.stderr.read().decode('utf-8', errors='replace').strip()
stdout_output = wifi_process.stdout.read().decode('utf-8', errors='replace').strip()
exit_code = wifi_process.returncode
wifi_process = None
error_msg = stderr_output or stdout_output or f'Process exited with code {exit_code}'
# Strip ANSI escape codes
import re
error_msg = re.sub(r'\x1b\[[0-9;]*m', '', error_msg)
# Common error explanations
if 'No such device' in error_msg or 'No such interface' in error_msg:
error_msg = f'Interface "{interface}" not found. Make sure monitor mode is enabled.'
elif 'Operation not permitted' in error_msg:
error_msg = 'Permission denied. Try running with sudo.'
elif 'monitor mode' in error_msg.lower():
error_msg = f'Interface "{interface}" is not in monitor mode. Enable monitor mode first.'
elif 'Failed initialising' in error_msg:
error_msg = f'Failed to initialize "{interface}". The adapter may have been disconnected or monitor mode is not active. Try disabling and re-enabling monitor mode.'
return jsonify({'status': 'error', 'message': error_msg})
# Start parsing thread
thread = threading.Thread(target=stream_airodump_output, args=(wifi_process, csv_path))
thread.daemon = True
thread.start()
wifi_queue.put({'type': 'info', 'text': f'Started scanning on {interface}'})
return jsonify({'status': 'started', 'interface': interface})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'airodump-ng not found. Install aircrack-ng suite (brew install aircrack-ng).'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/scan/stop', methods=['POST'])
def stop_wifi_scan():
"""Stop WiFi scanning."""
global wifi_process
with wifi_lock:
if wifi_process:
wifi_process.terminate()
try:
wifi_process.wait(timeout=3)
except subprocess.TimeoutExpired:
wifi_process.kill()
wifi_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/wifi/deauth', methods=['POST'])
def send_deauth():
"""Send deauthentication packets to force handshake capture."""
data = request.json
target_bssid = data.get('bssid')
target_client = data.get('client', 'FF:FF:FF:FF:FF:FF') # Broadcast by default
count = data.get('count', 5)
interface = data.get('interface') or wifi_monitor_interface
if not target_bssid:
return jsonify({'status': 'error', 'message': 'Target BSSID required'})
# Validate MAC addresses to prevent command injection
if not is_valid_mac(target_bssid):
return jsonify({'status': 'error', 'message': 'Invalid BSSID format'})
if not is_valid_mac(target_client):
return jsonify({'status': 'error', 'message': 'Invalid client MAC format'})
# Validate count to prevent abuse
try:
count = int(count)
if count < 1 or count > 100:
count = 5
except (ValueError, TypeError):
count = 5
if not interface:
return jsonify({'status': 'error', 'message': 'No monitor interface'})
if not check_tool('aireplay-ng'):
return jsonify({'status': 'error', 'message': 'aireplay-ng not found'})
try:
# aireplay-ng --deauth -a -c
cmd = [
'aireplay-ng',
'--deauth', str(count),
'-a', target_bssid,
'-c', target_client,
interface
]
wifi_queue.put({'type': 'info', 'text': f'Sending {count} deauth packets to {target_bssid}'})
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return jsonify({'status': 'success', 'message': f'Sent {count} deauth packets'})
else:
return jsonify({'status': 'error', 'message': result.stderr})
except subprocess.TimeoutExpired:
return jsonify({'status': 'success', 'message': 'Deauth sent (timed out waiting for completion)'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/handshake/capture', methods=['POST'])
def capture_handshake():
"""Start targeted handshake capture."""
global wifi_process
data = request.json
target_bssid = data.get('bssid')
channel = data.get('channel')
interface = data.get('interface') or wifi_monitor_interface
if not target_bssid or not channel:
return jsonify({'status': 'error', 'message': 'BSSID and channel required'})
# Validate inputs to prevent command injection
if not is_valid_mac(target_bssid):
return jsonify({'status': 'error', 'message': 'Invalid BSSID format'})
if not is_valid_channel(channel):
return jsonify({'status': 'error', 'message': 'Invalid channel'})
with wifi_lock:
if wifi_process:
return jsonify({'status': 'error', 'message': 'Scan already running. Stop it first.'})
# Safe to use in path after validation
capture_path = f'/tmp/intercept_handshake_{target_bssid.replace(":", "")}'
cmd = [
'airodump-ng',
'-c', str(channel),
'--bssid', target_bssid,
'-w', capture_path,
'--output-format', 'pcap',
interface
]
try:
wifi_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
wifi_queue.put({'type': 'info', 'text': f'Capturing handshakes for {target_bssid} on channel {channel}'})
return jsonify({'status': 'started', 'capture_file': capture_path + '-01.cap'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/handshake/status', methods=['POST'])
def check_handshake_status():
"""Check if a handshake has been captured in the specified file."""
import os
data = request.json
capture_file = data.get('file', '')
target_bssid = data.get('bssid', '')
# Security: ensure the file path is in /tmp and looks like our capture files
if not capture_file.startswith('/tmp/intercept_handshake_') or '..' in capture_file:
return jsonify({'status': 'error', 'message': 'Invalid capture file path'})
# Check if file exists
if not os.path.exists(capture_file):
# Check if capture is still running
with wifi_lock:
if wifi_process and wifi_process.poll() is None:
return jsonify({
'status': 'running',
'file_exists': False,
'handshake_found': False
})
else:
return jsonify({
'status': 'stopped',
'file_exists': False,
'handshake_found': False
})
# File exists - get size
file_size = os.path.getsize(capture_file)
# Use aircrack-ng to check if handshake is present
# aircrack-ng -a 2 -b will show if EAPOL handshake exists
handshake_found = False
try:
if target_bssid and is_valid_mac(target_bssid):
result = subprocess.run(
['aircrack-ng', '-a', '2', '-b', target_bssid, capture_file],
capture_output=True,
text=True,
timeout=10
)
# Check output for handshake indicators
# aircrack-ng shows "1 handshake" if found, or "0 handshake" if not
output = result.stdout + result.stderr
if '1 handshake' in output or 'handshake' in output.lower() and 'wpa' in output.lower():
# Also check it's not "0 handshake"
if '0 handshake' not in output:
handshake_found = True
except subprocess.TimeoutExpired:
pass # aircrack-ng timed out, assume no handshake yet
except Exception as e:
print(f"[WiFi] Error checking handshake: {e}", flush=True)
return jsonify({
'status': 'running' if wifi_process and wifi_process.poll() is None else 'stopped',
'file_exists': True,
'file_size': file_size,
'file': capture_file,
'handshake_found': handshake_found
})
# PMKID Capture using hcxdumptool
pmkid_process = None
pmkid_lock = threading.Lock()
@app.route('/wifi/pmkid/capture', methods=['POST'])
def capture_pmkid():
"""Start PMKID capture using hcxdumptool."""
global pmkid_process
data = request.json
target_bssid = data.get('bssid')
channel = data.get('channel')
interface = data.get('interface') or wifi_monitor_interface
if not target_bssid:
return jsonify({'status': 'error', 'message': 'BSSID required'})
if not is_valid_mac(target_bssid):
return jsonify({'status': 'error', 'message': 'Invalid BSSID format'})
with pmkid_lock:
if pmkid_process and pmkid_process.poll() is None:
return jsonify({'status': 'error', 'message': 'PMKID capture already running'})
capture_path = f'/tmp/intercept_pmkid_{target_bssid.replace(":", "")}.pcapng'
# Create filter file for target BSSID
filter_file = f'/tmp/pmkid_filter_{target_bssid.replace(":", "")}'
with open(filter_file, 'w') as f:
f.write(target_bssid.replace(':', '').lower())
# hcxdumptool command
cmd = [
'hcxdumptool',
'-i', interface,
'-o', capture_path,
'--filterlist_ap', filter_file,
'--filtermode', '2', # whitelist mode
'--enable_status', '1'
]
if channel:
cmd.extend(['-c', str(channel)])
try:
pmkid_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return jsonify({'status': 'started', 'file': capture_path})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'hcxdumptool not found. Install with: apt install hcxdumptool'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/pmkid/status', methods=['POST'])
def check_pmkid_status():
"""Check if PMKID has been captured."""
import os
data = request.json
capture_file = data.get('file', '')
if not capture_file.startswith('/tmp/intercept_pmkid_') or '..' in capture_file:
return jsonify({'status': 'error', 'message': 'Invalid capture file path'})
if not os.path.exists(capture_file):
return jsonify({'pmkid_found': False, 'file_exists': False})
file_size = os.path.getsize(capture_file)
# Use hcxpcapngtool to check for PMKID
pmkid_found = False
try:
hash_file = capture_file.replace('.pcapng', '.22000')
result = subprocess.run(
['hcxpcapngtool', '-o', hash_file, capture_file],
capture_output=True,
text=True,
timeout=10
)
# Check if hash file was created and has content
if os.path.exists(hash_file) and os.path.getsize(hash_file) > 0:
pmkid_found = True
except FileNotFoundError:
# hcxpcapngtool not installed, check file size as fallback
pmkid_found = file_size > 1000 # Rough heuristic
except Exception:
pass
return jsonify({
'pmkid_found': pmkid_found,
'file_exists': True,
'file_size': file_size,
'file': capture_file
})
@app.route('/wifi/pmkid/stop', methods=['POST'])
def stop_pmkid():
"""Stop PMKID capture."""
global pmkid_process
with pmkid_lock:
if pmkid_process:
pmkid_process.terminate()
try:
pmkid_process.wait(timeout=5)
except:
pmkid_process.kill()
pmkid_process = None
return jsonify({'status': 'stopped'})
@app.route('/wifi/networks')
def get_wifi_networks():
"""Get current list of discovered networks."""
return jsonify({
'networks': list(wifi_networks.values()),
'clients': list(wifi_clients.values()),
'handshakes': wifi_handshakes,
'monitor_interface': wifi_monitor_interface
})
@app.route('/wifi/stream')
def stream_wifi():
"""SSE stream for WiFi events."""
def generate():
import json
while True:
try:
msg = wifi_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============== BLUETOOTH RECONNAISSANCE ROUTES ==============
def get_manufacturer(mac):
"""Look up manufacturer from MAC address OUI."""
prefix = mac[:8].upper()
result = OUI_DATABASE.get(prefix, 'Unknown')
return result
def classify_bt_device(name, device_class, services, manufacturer=None):
"""Classify Bluetooth device type based on available info."""
name_lower = (name or '').lower()
mfr_lower = (manufacturer or '').lower()
# Audio devices - extensive patterns
audio_patterns = [
'airpod', 'earbud', 'headphone', 'headset', 'speaker', 'audio', 'beats', 'bose',
'jbl', 'sony wh', 'sony wf', 'sennheiser', 'jabra', 'soundcore', 'anker', 'buds',
'earphone', 'pod', 'soundbar', 'subwoofer', 'amp', 'dac', 'hifi', 'stereo',
'skullcandy', 'marshall', 'b&o', 'bang', 'olufsen', 'harman', 'akg', 'shure',
'audio-technica', 'plantronics', 'poly', 'soundlink', 'soundsport', 'quietcomfort',
'freebuds', 'galaxy buds', 'wf-', 'wh-', 'linkbuds', 'momentum', 'px7', 'px8',
'liberty', 'life', 'enco', 'oppo enco', 'nothing ear', 'ear (', 'studio buds',
'powerbeats', 'solo', 'flex', 'tour', 'tune', 'reflect', 'endurance', 'soundpeats'
]
if any(x in name_lower for x in audio_patterns):
return 'audio'
# Wearables - watches, bands, fitness
wearable_patterns = [
'watch', 'band', 'fitbit', 'garmin', 'mi band', 'miband', 'amazfit', 'huawei band',
'galaxy watch', 'gear', 'versa', 'sense', 'charge', 'inspire', 'vivosmart',
'vivoactive', 'venu', 'forerunner', 'fenix', 'instinct', 'polar', 'suunto',
'whoop', 'oura', 'ring', 'wristband', 'fitness', 'tracker', 'activity',
'apple watch', 'iwatch', 'samsung watch', 'ticwatch', 'fossil', 'withings'
]
if any(x in name_lower for x in wearable_patterns):
return 'wearable'
# Phones - mobile devices
phone_patterns = [
'iphone', 'galaxy', 'pixel', 'phone', 'android', 'oneplus', 'huawei', 'xiaomi',
'redmi', 'poco', 'realme', 'oppo', 'vivo', 'motorola', 'moto', 'nokia', 'lg',
'sony xperia', 'xperia', 'asus', 'rog phone', 'zenfone', 'nothing phone',
'samsung sm-', 'sm-g', 'sm-a', 'sm-s', 'sm-n', 'sm-f'
]
if any(x in name_lower for x in phone_patterns):
return 'phone'
# Trackers - location devices
tracker_patterns = [
'airtag', 'tile', 'smarttag', 'chipolo', 'find my', 'findmy', 'locator',
'gps', 'pet tracker', 'key finder', 'nut', 'trackr', 'pebblebee', 'cube'
]
if any(x in name_lower for x in tracker_patterns):
return 'tracker'
# Input devices - keyboards, mice, controllers
input_patterns = [
'keyboard', 'mouse', 'controller', 'gamepad', 'joystick', 'remote', 'trackpad',
'magic mouse', 'magic keyboard', 'mx master', 'mx keys', 'logitech', 'razer',
'dualshock', 'dualsense', 'xbox', 'switch pro', 'joycon', 'joy-con', '8bitdo',
'steelseries', 'corsair', 'hyperx'
]
if any(x in name_lower for x in input_patterns):
return 'input'
# Media devices - TVs, streaming
media_patterns = [
'tv', 'roku', 'chromecast', 'firestick', 'fire tv', 'appletv', 'apple tv',
'nvidia shield', 'android tv', 'smart tv', 'lg tv', 'samsung tv', 'sony tv',
'tcl', 'hisense', 'vizio', 'projector', 'beam', 'soundbase'
]
if any(x in name_lower for x in media_patterns):
return 'media'
# Computers - laptops, desktops
computer_patterns = [
'macbook', 'imac', 'mac mini', 'mac pro', 'thinkpad', 'latitude', 'xps',
'pavilion', 'envy', 'spectre', 'surface', 'chromebook', 'ideapad', 'legion',
'predator', 'rog', 'alienware', 'desktop', 'laptop', 'notebook', 'pc'
]
if any(x in name_lower for x in computer_patterns):
return 'computer'
# Use manufacturer to infer type
if mfr_lower in ['bose', 'jbl', 'sony', 'sennheiser', 'jabra', 'beats', 'bang & olufsen', 'audio-technica', 'plantronics', 'skullcandy', 'anker']:
return 'audio'
if mfr_lower in ['fitbit', 'garmin']:
return 'wearable'
if mfr_lower == 'tile':
return 'tracker'
if mfr_lower == 'logitech':
return 'input'
# Check device class if available
if device_class:
major_class = (device_class >> 8) & 0x1F
if major_class == 1: # Computer
return 'computer'
elif major_class == 2: # Phone
return 'phone'
elif major_class == 4: # Audio/Video
return 'audio'
elif major_class == 5: # Peripheral
return 'input'
elif major_class == 6: # Imaging
return 'imaging'
elif major_class == 7: # Wearable
return 'wearable'
return 'other'
def detect_tracker(mac, name, manufacturer_data=None):
"""Detect if device is a known tracker (AirTag, Tile, etc)."""
mac_prefix = mac[:5].upper()
# AirTag detection (Apple Find My)
if any(mac_prefix.startswith(p) for p in AIRTAG_PREFIXES):
if manufacturer_data and b'\\x4c\\x00' in manufacturer_data:
return {'type': 'airtag', 'name': 'Apple AirTag', 'risk': 'high'}
# Tile detection
if any(mac_prefix.startswith(p) for p in TILE_PREFIXES):
return {'type': 'tile', 'name': 'Tile Tracker', 'risk': 'medium'}
# Samsung SmartTag
if any(mac_prefix.startswith(p) for p in SAMSUNG_TRACKER):
return {'type': 'smarttag', 'name': 'Samsung SmartTag', 'risk': 'medium'}
# Name-based detection
name_lower = (name or '').lower()
if 'airtag' in name_lower:
return {'type': 'airtag', 'name': 'Apple AirTag', 'risk': 'high'}
if 'tile' in name_lower:
return {'type': 'tile', 'name': 'Tile Tracker', 'risk': 'medium'}
if 'smarttag' in name_lower:
return {'type': 'smarttag', 'name': 'Samsung SmartTag', 'risk': 'medium'}
if 'chipolo' in name_lower:
return {'type': 'chipolo', 'name': 'Chipolo Tracker', 'risk': 'medium'}
return None
def detect_bt_interfaces():
"""Detect available Bluetooth interfaces."""
interfaces = []
import platform
if platform.system() == 'Linux':
try:
# Use hciconfig to list interfaces
result = subprocess.run(['hciconfig'], capture_output=True, text=True, timeout=5)
output = result.stdout
# Parse hciconfig output - "UP RUNNING" appears on a separate line
import re
# Split by interface blocks
blocks = re.split(r'(?=^hci\d+:)', output, flags=re.MULTILINE)
for block in blocks:
if block.strip():
# Get interface name from first line
first_line = block.split('\n')[0]
match = re.match(r'(hci\d+):', first_line)
if match:
iface_name = match.group(1)
# Check if UP appears anywhere in the block
is_up = 'UP RUNNING' in block or '\tUP ' in block
interfaces.append({
'name': iface_name,
'type': 'hci',
'status': 'up' if is_up else 'down'
})
except FileNotFoundError:
pass
except Exception as e:
print(f"[BT] Error detecting interfaces: {e}")
elif platform.system() == 'Darwin': # macOS
# macOS uses different Bluetooth stack
interfaces.append({
'name': 'default',
'type': 'macos',
'status': 'available'
})
return interfaces
@app.route('/bt/reload-oui', methods=['POST'])
def reload_oui_database():
"""Reload OUI database from external file."""
global OUI_DATABASE
new_db = load_oui_database()
if new_db:
OUI_DATABASE = new_db
return jsonify({'status': 'success', 'entries': len(OUI_DATABASE)})
return jsonify({'status': 'error', 'message': 'Could not load oui_database.json'})
@app.route('/bt/interfaces')
def get_bt_interfaces():
"""Get available Bluetooth interfaces and tools."""
interfaces = detect_bt_interfaces()
tools = {
'hcitool': check_tool('hcitool'),
'bluetoothctl': check_tool('bluetoothctl'),
'hciconfig': check_tool('hciconfig'),
'l2ping': check_tool('l2ping'),
'sdptool': check_tool('sdptool')
}
return jsonify({
'interfaces': interfaces,
'tools': tools,
'current_interface': bt_interface
})
def parse_hcitool_output(line):
"""Parse hcitool scan output line."""
# Format: "AA:BB:CC:DD:EE:FF Device Name"
parts = line.strip().split('\t')
if len(parts) >= 2:
mac = parts[0].strip()
name = parts[1].strip() if len(parts) > 1 else ''
if ':' in mac and len(mac) == 17:
return {'mac': mac, 'name': name}
return None
def stream_bt_scan(process, scan_mode):
"""Stream Bluetooth scan output to queue."""
global bt_process, bt_devices
import time
try:
bt_queue.put({'type': 'status', 'text': 'started'})
start_time = time.time()
device_found = False
# Set up non-blocking stderr reading
try:
import fcntl
fd = process.stderr.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
except Exception:
pass
if scan_mode == 'hcitool':
# hcitool lescan output
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line or 'LE Scan' in line:
continue
# Parse BLE device
parts = line.split()
if len(parts) >= 1 and ':' in parts[0]:
mac = parts[0]
name = ' '.join(parts[1:]) if len(parts) > 1 else ''
manufacturer = get_manufacturer(mac)
device = {
'mac': mac,
'name': name or '[Unknown]',
'manufacturer': manufacturer,
'type': classify_bt_device(name, None, None, manufacturer),
'rssi': None,
'last_seen': time.time()
}
# Check for tracker
tracker = detect_tracker(mac, name)
if tracker:
device['tracker'] = tracker
is_new = mac not in bt_devices
bt_devices[mac] = device
queue_data = {
**device,
'type': 'device', # Must come after **device to not be overwritten
'device_type': device.get('type', 'other'),
'action': 'new' if is_new else 'update',
}
bt_queue.put(queue_data)
elif scan_mode == 'bluetoothctl':
# bluetoothctl scan output - read from pty
import os
import select
import time
import re
master_fd = getattr(process, '_master_fd', None)
if not master_fd:
bt_queue.put({'type': 'error', 'text': 'bluetoothctl pty not available'})
return
buffer = ''
while process.poll() is None:
# Check if data available
readable, _, _ = select.select([master_fd], [], [], 1.0)
if readable:
try:
data = os.read(master_fd, 4096)
if not data:
break
buffer += data.decode('utf-8', errors='replace')
# Process complete lines
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
# Remove ANSI escape codes
line = re.sub(r'\x1b\[[0-9;]*m', '', line)
line = re.sub(r'\x1b\[\?.*?[a-zA-Z]', '', line)
line = re.sub(r'\x1b\[K', '', line) # Clear line escape
line = re.sub(r'\r', '', line) # Remove carriage returns
# Debug: print what we're receiving
if line and 'Device' in line:
print(f"[BT] bluetoothctl: {line}")
# Parse [NEW] Device or [CHG] Device lines
# Format: [NEW] Device AA:BB:CC:DD:EE:FF DeviceName
if 'Device' in line:
match = re.search(r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)', line)
if match:
mac = match.group(1).upper()
name = match.group(2).strip()
manufacturer = get_manufacturer(mac)
device = {
'mac': mac,
'name': name or '[Unknown]',
'manufacturer': manufacturer,
'type': classify_bt_device(name, None, None, manufacturer),
'rssi': None,
'last_seen': time.time()
}
tracker = detect_tracker(mac, name)
if tracker:
device['tracker'] = tracker
is_new = mac not in bt_devices
bt_devices[mac] = device
queue_data = {
**device,
'type': 'device', # Must come after **device to not be overwritten
'device_type': device.get('type', 'other'),
'action': 'new' if is_new else 'update',
}
print(f"[BT] Queuing device: {mac} - {name}")
bt_queue.put(queue_data)
except OSError:
break
# Close master_fd
try:
os.close(master_fd)
except:
pass
except Exception as e:
bt_queue.put({'type': 'error', 'text': str(e)})
finally:
# Capture any remaining stderr
try:
remaining_stderr = process.stderr.read()
if remaining_stderr:
stderr_text = remaining_stderr.decode('utf-8', errors='replace').strip()
if stderr_text:
bt_queue.put({'type': 'error', 'text': f'Bluetooth scan: {stderr_text}'})
except Exception:
pass
# Check exit code
process.wait()
exit_code = process.returncode
if exit_code != 0 and exit_code is not None:
bt_queue.put({'type': 'error', 'text': f'Bluetooth scan exited with code {exit_code}'})
bt_queue.put({'type': 'status', 'text': 'stopped'})
with bt_lock:
bt_process = None
@app.route('/bt/scan/start', methods=['POST'])
def start_bt_scan():
"""Start Bluetooth scanning."""
global bt_process, bt_devices, bt_interface
with bt_lock:
# Check if process is actually still running (not just set)
if bt_process:
if bt_process.poll() is None:
# Process is actually running
return jsonify({'status': 'error', 'message': 'Scan already running'})
else:
# Process died, clear the state
bt_process = None
data = request.json
scan_mode = data.get('mode', 'hcitool')
interface = data.get('interface', 'hci0')
duration = data.get('duration', 30)
scan_ble = data.get('scan_ble', True)
scan_classic = data.get('scan_classic', True)
bt_interface = interface
bt_devices = {}
# Clear queue
while not bt_queue.empty():
try:
bt_queue.get_nowait()
except:
break
try:
if scan_mode == 'hcitool':
if scan_ble:
cmd = ['hcitool', '-i', interface, 'lescan', '--duplicates']
else:
cmd = ['hcitool', '-i', interface, 'scan']
bt_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
elif scan_mode == 'bluetoothctl':
# Use bluetoothctl for BLE scanning with pty for proper output
import pty
import os
import time
master_fd, slave_fd = pty.openpty()
bt_process = subprocess.Popen(
['bluetoothctl'],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd)
# Store master_fd for reading
bt_process._master_fd = master_fd
# Wait for bluetoothctl to initialize
time.sleep(0.5)
# Power on and start LE scan (compatible with Bluetooth 5.x)
os.write(master_fd, b'power on\n')
time.sleep(0.3)
os.write(master_fd, b'scan on\n')
else:
return jsonify({'status': 'error', 'message': f'Unknown scan mode: {scan_mode}'})
# Wait briefly to check if process fails immediately
import time
time.sleep(0.5)
if bt_process.poll() is not None:
# Process already exited - capture error
stderr_output = bt_process.stderr.read().decode('utf-8', errors='replace').strip()
stdout_output = bt_process.stdout.read().decode('utf-8', errors='replace').strip()
exit_code = bt_process.returncode
bt_process = None
error_msg = stderr_output or stdout_output or f'Process exited with code {exit_code}'
# Common error explanations and auto-recovery
if 'No such device' in error_msg or 'hci0' in error_msg.lower():
error_msg = f'Bluetooth interface "{interface}" not found or not available.'
elif 'Operation not permitted' in error_msg or 'Permission denied' in error_msg:
error_msg = 'Permission denied. Try running with sudo or add user to bluetooth group.'
elif 'busy' in error_msg.lower():
error_msg = f'Bluetooth interface "{interface}" is busy. Stop other Bluetooth operations first.'
elif 'set scan parameters failed' in error_msg.lower() or 'input/output error' in error_msg.lower():
# Try to auto-reset the adapter
try:
subprocess.run(['hciconfig', interface, 'down'], capture_output=True, timeout=5)
subprocess.run(['hciconfig', interface, 'up'], capture_output=True, timeout=5)
error_msg = f'Adapter error - attempted auto-reset. Click "Reset Adapter" and try again.'
except:
error_msg = 'Bluetooth adapter I/O error. Click "Reset Adapter" to reset the adapter and try again.'
return jsonify({'status': 'error', 'message': error_msg})
# Start streaming thread
thread = threading.Thread(target=stream_bt_scan, args=(bt_process, scan_mode))
thread.daemon = True
thread.start()
bt_queue.put({'type': 'info', 'text': f'Started {scan_mode} scan on {interface}'})
return jsonify({'status': 'started', 'mode': scan_mode, 'interface': interface})
except FileNotFoundError as e:
tool_name = e.filename or scan_mode
return jsonify({'status': 'error', 'message': f'Tool "{tool_name}" not found. Install required Bluetooth tools.'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/scan/stop', methods=['POST'])
def stop_bt_scan():
"""Stop Bluetooth scanning."""
global bt_process
with bt_lock:
if bt_process:
bt_process.terminate()
try:
bt_process.wait(timeout=3)
except subprocess.TimeoutExpired:
bt_process.kill()
bt_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/bt/reset', methods=['POST'])
def reset_bt_adapter():
"""Reset Bluetooth adapter and clear scan state."""
global bt_process
data = request.json
interface = data.get('interface', 'hci0')
with bt_lock:
# Force clear the process state
if bt_process:
try:
bt_process.terminate()
bt_process.wait(timeout=2)
except:
try:
bt_process.kill()
except:
pass
bt_process = None
# Reset the adapter
try:
import time
import os
# Kill any processes that might be using the adapter
subprocess.run(['pkill', '-f', 'hcitool'], capture_output=True, timeout=2)
subprocess.run(['pkill', '-f', 'bluetoothctl'], capture_output=True, timeout=2)
time.sleep(0.5)
# Check if running as root
is_root = os.geteuid() == 0
# Try rfkill unblock first
subprocess.run(['rfkill', 'unblock', 'bluetooth'], capture_output=True, timeout=5)
# Reset the adapter with a delay between down and up
if is_root:
down_result = subprocess.run(['hciconfig', interface, 'down'], capture_output=True, text=True, timeout=5)
time.sleep(1)
up_result = subprocess.run(['hciconfig', interface, 'up'], capture_output=True, text=True, timeout=5)
else:
# Try with sudo
down_result = subprocess.run(['sudo', '-n', 'hciconfig', interface, 'down'], capture_output=True, text=True, timeout=5)
time.sleep(1)
up_result = subprocess.run(['sudo', '-n', 'hciconfig', interface, 'up'], capture_output=True, text=True, timeout=5)
time.sleep(0.5)
# Check if adapter is up
result = subprocess.run(['hciconfig', interface], capture_output=True, text=True, timeout=5)
is_up = 'UP RUNNING' in result.stdout
# If still not up, try bluetoothctl
if not is_up:
subprocess.run(['bluetoothctl', 'power', 'off'], capture_output=True, timeout=5)
time.sleep(1)
subprocess.run(['bluetoothctl', 'power', 'on'], capture_output=True, timeout=5)
time.sleep(0.5)
result = subprocess.run(['hciconfig', interface], capture_output=True, text=True, timeout=5)
is_up = 'UP RUNNING' in result.stdout
if is_up:
bt_queue.put({'type': 'info', 'text': f'Bluetooth adapter {interface} reset successfully'})
else:
bt_queue.put({'type': 'error', 'text': f'Adapter {interface} may need manual reset. Try: sudo hciconfig {interface} up'})
return jsonify({
'status': 'success' if is_up else 'warning',
'message': f'Adapter {interface} reset' if is_up else f'Reset attempted but adapter still down. Run: sudo hciconfig {interface} up',
'is_up': is_up
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/enum', methods=['POST'])
def enum_bt_services():
"""Enumerate services on a Bluetooth device."""
data = request.json
target_mac = data.get('mac')
if not target_mac:
return jsonify({'status': 'error', 'message': 'Target MAC required'})
try:
# Try sdptool for classic BT
result = subprocess.run(
['sdptool', 'browse', target_mac],
capture_output=True, text=True, timeout=30
)
services = []
current_service = {}
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('Service Name:'):
if current_service:
services.append(current_service)
current_service = {'name': line.split(':', 1)[1].strip()}
elif line.startswith('Service Description:'):
current_service['description'] = line.split(':', 1)[1].strip()
elif line.startswith('Service Provider:'):
current_service['provider'] = line.split(':', 1)[1].strip()
elif 'Protocol Descriptor' in line:
current_service['protocol'] = line
if current_service:
services.append(current_service)
bt_services[target_mac] = services
return jsonify({
'status': 'success',
'mac': target_mac,
'services': services
})
except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Connection timed out'})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'sdptool not found'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/devices')
def get_bt_devices():
"""Get current list of discovered Bluetooth devices."""
return jsonify({
'devices': list(bt_devices.values()),
'beacons': list(bt_beacons.values()),
'interface': bt_interface
})
@app.route('/bt/stream')
def stream_bt():
"""SSE stream for Bluetooth events."""
print("[BT Stream] Client connected")
def generate():
import json
print("[BT Stream] Generator started, waiting for queue...")
while True:
try:
msg = bt_queue.get(timeout=1)
print(f"[BT Stream] Got from queue: {msg.get('type')}")
if msg.get('type') == 'device':
print(f"[BT Stream] Sending device: {msg.get('mac')}")
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============================================
# AIRCRAFT (ADS-B) ROUTES
# ============================================
@app.route('/adsb/tools')
def check_adsb_tools():
"""Check for ADS-B decoding tools."""
return jsonify({
'dump1090': shutil.which('dump1090') is not None or shutil.which('dump1090-mutability') is not None,
'rtl_adsb': shutil.which('rtl_adsb') is not None
})
@app.route('/adsb/start', methods=['POST'])
def start_adsb():
"""Start ADS-B tracking."""
global adsb_process
with adsb_lock:
if adsb_process and adsb_process.poll() is None:
return jsonify({'status': 'error', 'message': 'ADS-B already running'})
data = request.json
gain = data.get('gain', '40')
device = data.get('device', '0')
# Try dump1090 first, fall back to rtl_adsb
dump1090_path = shutil.which('dump1090') or shutil.which('dump1090-mutability')
if dump1090_path:
cmd = [dump1090_path, '--raw', '--net', f'--gain', gain, f'--device-index', str(device)]
elif shutil.which('rtl_adsb'):
cmd = ['rtl_adsb', '-g', gain, '-d', str(device)]
else:
return jsonify({'status': 'error', 'message': 'No ADS-B decoder found (install dump1090 or rtl_adsb)'})
try:
adsb_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True
)
# Start parsing thread
thread = threading.Thread(target=parse_adsb_output, args=(adsb_process,), daemon=True)
thread.start()
return jsonify({'status': 'started'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/adsb/stop', methods=['POST'])
def stop_adsb():
"""Stop ADS-B tracking."""
global adsb_process, adsb_aircraft
with adsb_lock:
if adsb_process:
adsb_process.terminate()
try:
adsb_process.wait(timeout=5)
except:
adsb_process.kill()
adsb_process = None
adsb_aircraft = {}
return jsonify({'status': 'stopped'})
@app.route('/adsb/stream')
def stream_adsb():
"""SSE stream for ADS-B aircraft."""
def generate():
while True:
try:
msg = adsb_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
def parse_adsb_output(process):
"""Parse ADS-B output and poll dump1090 JSON for decoded data."""
global adsb_aircraft
import re
import urllib.request
import json as json_lib
icao_pattern = re.compile(r'\*([0-9A-Fa-f]{6,14});')
# Start a thread to poll dump1090's JSON endpoint for decoded positions
def poll_dump1090_json():
"""Poll dump1090's aircraft.json for decoded lat/lon data."""
json_urls = [
'http://localhost:8080/data/aircraft.json',
'http://localhost:30003/data/aircraft.json',
'http://localhost:8080/dump1090/data/aircraft.json'
]
working_url = None
while adsb_process and adsb_process.poll() is None:
try:
# Find working URL on first success
urls_to_try = [working_url] if working_url else json_urls
for url in urls_to_try:
try:
with urllib.request.urlopen(url, timeout=2) as response:
data = json_lib.loads(response.read().decode())
working_url = url
for ac in data.get('aircraft', []):
icao = ac.get('hex', '').upper()
if not icao:
continue
# Update aircraft with decoded position data
aircraft = adsb_aircraft.get(icao, {'icao': icao})
aircraft.update({
'icao': icao,
'callsign': ac.get('flight', '').strip() or aircraft.get('callsign'),
'altitude': ac.get('altitude') or ac.get('alt_baro') or aircraft.get('altitude'),
'speed': ac.get('speed') or ac.get('gs') or aircraft.get('speed'),
'heading': ac.get('track') or aircraft.get('heading'),
'lat': ac.get('lat') or aircraft.get('lat'),
'lon': ac.get('lon') or aircraft.get('lon'),
'squawk': ac.get('squawk') or aircraft.get('squawk'),
'rssi': ac.get('rssi') or aircraft.get('rssi')
})
adsb_aircraft[icao] = aircraft
adsb_queue.put({
'type': 'aircraft',
**aircraft
})
break
except:
continue
except Exception as e:
pass
time.sleep(1)
# Start JSON polling thread
json_thread = threading.Thread(target=poll_dump1090_json, daemon=True)
json_thread.start()
# Also parse raw output for immediate ICAO detection
try:
for line in process.stdout:
line = line.strip()
if not line:
continue
# Parse raw Mode S messages for quick ICAO detection
match = icao_pattern.search(line)
if match:
raw = match.group(1)
if len(raw) >= 6:
icao = raw[:6].upper()
# Create placeholder if not seen via JSON yet
if icao not in adsb_aircraft:
aircraft = {
'icao': icao,
'callsign': None,
'altitude': None,
'speed': None,
'heading': None,
'lat': None,
'lon': None
}
adsb_aircraft[icao] = aircraft
adsb_queue.put({
'type': 'aircraft',
**aircraft
})
except Exception as e:
print(f"[ADS-B] Parse error: {e}")
# ============================================
# SATELLITE ROUTES
# ============================================
@app.route('/satellite/predict', methods=['POST'])
def predict_passes():
"""Calculate satellite passes using skyfield for accurate orbital prediction."""
from datetime import datetime, timedelta
try:
from skyfield.api import load, wgs84, EarthSatellite
from skyfield.almanac import find_discrete
except ImportError:
# Fallback if skyfield not installed
return jsonify({
'status': 'error',
'message': 'skyfield library not installed. Run: pip install skyfield'
})
data = request.json
lat = data.get('lat', 51.5074)
lon = data.get('lon', -0.1278)
hours = data.get('hours', 24)
min_el = data.get('minEl', 10)
satellites = data.get('satellites', ['ISS', 'NOAA-15', 'NOAA-18', 'NOAA-19'])
passes = []
colors = {'ISS': '#00ffff', 'NOAA-15': '#00ff00', 'NOAA-18': '#ff6600', 'NOAA-19': '#ff3366', 'METEOR-M2': '#9370DB'}
ts = load.timescale()
observer = wgs84.latlon(lat, lon)
t0 = ts.now()
t1 = ts.utc(t0.utc_datetime() + timedelta(hours=hours))
for sat_name in satellites:
if sat_name not in TLE_SATELLITES:
continue
tle_data = TLE_SATELLITES[sat_name]
try:
satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], ts)
except Exception:
continue
# Find passes by checking when satellite is above minimum elevation
def above_horizon(t):
diff = satellite - observer
topocentric = diff.at(t)
alt, _, _ = topocentric.altaz()
return alt.degrees > 0
above_horizon.step_days = 1/720 # Check every 2 minutes
try:
times, events = find_discrete(t0, t1, above_horizon)
except Exception:
continue
# Process rise/set pairs
i = 0
while i < len(times):
# Find rise event (event = True)
if i < len(events) and events[i]:
rise_time = times[i]
# Find corresponding set event
set_time = None
for j in range(i + 1, len(times)):
if not events[j]:
set_time = times[j]
i = j
break
if set_time is None:
i += 1
continue
# Generate trajectory points between rise and set
trajectory = []
max_elevation = 0
num_points = 30
duration_seconds = (set_time.utc_datetime() - rise_time.utc_datetime()).total_seconds()
for k in range(num_points):
frac = k / (num_points - 1)
t_point = ts.utc(rise_time.utc_datetime() + timedelta(seconds=duration_seconds * frac))
diff = satellite - observer
topocentric = diff.at(t_point)
alt, az, _ = topocentric.altaz()
el = alt.degrees
azimuth = az.degrees
if el > max_elevation:
max_elevation = el
trajectory.append({'elevation': max(0, el), 'azimuth': azimuth})
# Only include pass if max elevation meets minimum requirement
if max_elevation >= min_el:
duration_minutes = int(duration_seconds / 60)
# Generate ground track (sub-satellite points)
ground_track = []
for k in range(60): # 60 points for smoother track
frac = k / 59
t_point = ts.utc(rise_time.utc_datetime() + timedelta(seconds=duration_seconds * frac))
geocentric = satellite.at(t_point)
subpoint = wgs84.subpoint(geocentric)
ground_track.append({
'lat': subpoint.latitude.degrees,
'lon': subpoint.longitude.degrees
})
# Get current position
current_geo = satellite.at(ts.now())
current_subpoint = wgs84.subpoint(current_geo)
current_topo = (satellite - observer).at(ts.now())
current_alt, current_az, _ = current_topo.altaz()
passes.append({
'satellite': sat_name,
'startTime': rise_time.utc_datetime().strftime('%Y-%m-%d %H:%M UTC'),
'maxEl': round(max_elevation, 1),
'duration': duration_minutes,
'trajectory': trajectory,
'groundTrack': ground_track,
'currentPosition': {
'lat': current_subpoint.latitude.degrees,
'lon': current_subpoint.longitude.degrees,
'altitude': current_geo.distance().km - 6371, # Approx altitude
'elevation': current_alt.degrees,
'azimuth': current_az.degrees
},
'color': colors.get(sat_name, '#00ff00')
})
i += 1
# Sort by time
passes.sort(key=lambda p: p['startTime'])
return jsonify({
'status': 'success',
'passes': passes
})
@app.route('/satellite/position', methods=['POST'])
def get_satellite_position():
"""Get real-time positions of satellites with full orbit ground track."""
from datetime import datetime, timedelta
try:
from skyfield.api import load, wgs84, EarthSatellite
except ImportError:
return jsonify({'status': 'error', 'message': 'skyfield not installed'})
data = request.json
lat = data.get('lat', 51.5074)
lon = data.get('lon', -0.1278)
satellites = data.get('satellites', [])
include_track = data.get('includeTrack', True)
ts = load.timescale()
observer = wgs84.latlon(lat, lon)
now = ts.now()
now_dt = now.utc_datetime()
positions = []
for sat_name in satellites:
if sat_name not in TLE_SATELLITES:
continue
tle_data = TLE_SATELLITES[sat_name]
try:
satellite = EarthSatellite(tle_data[1], tle_data[2], tle_data[0], ts)
# Get current geocentric position
geocentric = satellite.at(now)
subpoint = wgs84.subpoint(geocentric)
# Get topocentric position (from observer)
diff = satellite - observer
topocentric = diff.at(now)
alt, az, distance = topocentric.altaz()
pos_data = {
'satellite': sat_name,
'lat': subpoint.latitude.degrees,
'lon': subpoint.longitude.degrees,
'altitude': geocentric.distance().km - 6371,
'elevation': alt.degrees,
'azimuth': az.degrees,
'distance': distance.km,
'visible': alt.degrees > 0
}
# Generate full orbit ground track (±45 minutes = ~1 orbit for LEO)
if include_track:
orbit_track = []
# Past 45 minutes to future 45 minutes in 1-minute intervals
for minutes_offset in range(-45, 46, 1):
t_point = ts.utc(now_dt + timedelta(minutes=minutes_offset))
try:
geo = satellite.at(t_point)
sp = wgs84.subpoint(geo)
orbit_track.append({
'lat': sp.latitude.degrees,
'lon': sp.longitude.degrees,
'past': minutes_offset < 0
})
except:
continue
pos_data['orbitTrack'] = orbit_track
positions.append(pos_data)
except Exception:
continue
return jsonify({
'status': 'success',
'positions': positions,
'timestamp': datetime.utcnow().isoformat()
})
@app.route('/satellite/update-tle', methods=['POST'])
def update_tle():
"""Update TLE data from CelesTrak."""
global TLE_SATELLITES
try:
import urllib.request
# Map our satellite names to CelesTrak groups
groups_to_fetch = {
'stations': ['ISS'],
'weather': ['NOAA-15', 'NOAA-18', 'NOAA-19', 'METEOR-M2']
}
# Name mappings from CelesTrak to our internal names
name_mappings = {
'ISS (ZARYA)': 'ISS',
'NOAA 15': 'NOAA-15',
'NOAA 18': 'NOAA-18',
'NOAA 19': 'NOAA-19',
'METEOR-M 2': 'METEOR-M2',
'METEOR-M2 2': 'METEOR-M2'
}
updated = []
for group, sats in groups_to_fetch.items():
try:
url = f'https://celestrak.org/NORAD/elements/gp.php?GROUP={group}&FORMAT=tle'
req = urllib.request.Request(url, headers={'User-Agent': 'INTERCEPT/1.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tle_data = response.read().decode('utf-8')
lines = [l.strip() for l in tle_data.strip().split('\n') if l.strip()]
for i in range(0, len(lines) - 2, 3):
name = lines[i]
line1 = lines[i + 1]
line2 = lines[i + 2]
if line1.startswith('1 ') and line2.startswith('2 '):
# Check if this satellite is one we track
internal_name = name_mappings.get(name)
if internal_name and internal_name in sats:
TLE_SATELLITES[internal_name] = (name, line1, line2)
updated.append(internal_name)
except Exception:
continue
return jsonify({
'status': 'success',
'message': f'Updated TLE for: {", ".join(updated) if updated else "none"}',
'updated': updated
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
})
@app.route('/satellite/celestrak/')
def fetch_celestrak(category):
"""Fetch TLE data from CelesTrak for a specific category."""
global TLE_SATELLITES
import urllib.request
# Map category names to Celestrak groups
category_map = {
'stations': 'stations',
'visual': 'visual',
'weather': 'weather',
'noaa': 'noaa',
'amateur': 'amateur',
'starlink': 'starlink',
'gps-ops': 'gps-ops',
'iridium': 'iridium'
}
if category not in category_map:
return jsonify({'status': 'error', 'message': 'Unknown category'})
try:
url = f'https://celestrak.org/NORAD/elements/gp.php?GROUP={category_map[category]}&FORMAT=tle'
req = urllib.request.Request(url, headers={'User-Agent': 'INTERCEPT/1.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tle_data = response.read().decode('utf-8')
lines = [l.strip() for l in tle_data.strip().split('\n') if l.strip()]
satellites = []
# Parse TLE (3 lines per satellite)
for i in range(0, len(lines) - 2, 3):
name = lines[i]
line1 = lines[i + 1]
line2 = lines[i + 2]
if line1.startswith('1 ') and line2.startswith('2 '):
norad = line1[2:7].strip()
sat_id = name.replace(' ', '-').replace('/', '-').upper()[:20]
satellites.append({
'id': sat_id,
'name': name,
'norad': norad,
'tle': [name, line1, line2]
})
# Also add to TLE_SATELLITES for prediction
TLE_SATELLITES[sat_id] = (name, line1, line2)
# Limit to first 50 satellites to avoid overwhelming the UI
return jsonify({
'status': 'success',
'satellites': satellites[:50],
'total': len(satellites)
})
except urllib.error.URLError as e:
return jsonify({'status': 'error', 'message': f'Network error: {str(e)}'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
# ============================================
# IRIDIUM ROUTES
# ============================================
@app.route('/iridium/tools')
def check_iridium_tools():
"""Check for Iridium decoding tools."""
# Check for iridium-extractor or gr-iridium
has_tool = shutil.which('iridium-extractor') is not None or shutil.which('iridium-parser') is not None
return jsonify({'available': has_tool})
@app.route('/iridium/start', methods=['POST'])
def start_iridium():
"""Start Iridium burst capture."""
global satellite_process
with satellite_lock:
if satellite_process and satellite_process.poll() is None:
return jsonify({'status': 'error', 'message': 'Iridium capture already running'})
data = request.json
freq = data.get('freq', '1626.0')
gain = data.get('gain', '40')
sample_rate = data.get('sampleRate', '2.048e6')
device = data.get('device', '0')
# Check for tools
if not shutil.which('iridium-extractor') and not shutil.which('rtl_fm'):
return jsonify({
'status': 'error',
'message': 'Iridium tools not found. Install gr-iridium or use rtl_fm for basic capture.'
})
try:
# For demo, use rtl_fm to capture L-band (iridium-extractor would be better)
# Real implementation would pipe to iridium-extractor
cmd = [
'rtl_fm',
'-f', f'{float(freq)}M',
'-g', str(gain),
'-s', sample_rate,
'-d', str(device),
'-'
]
satellite_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Start monitoring thread
thread = threading.Thread(target=monitor_iridium, args=(satellite_process,), daemon=True)
thread.start()
return jsonify({'status': 'started'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/iridium/stop', methods=['POST'])
def stop_iridium():
"""Stop Iridium capture."""
global satellite_process, iridium_bursts
with satellite_lock:
if satellite_process:
satellite_process.terminate()
try:
satellite_process.wait(timeout=5)
except:
satellite_process.kill()
satellite_process = None
return jsonify({'status': 'stopped'})
@app.route('/iridium/stream')
def stream_iridium():
"""SSE stream for Iridium bursts."""
def generate():
while True:
try:
msg = satellite_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
def monitor_iridium(process):
"""Monitor Iridium capture and detect bursts."""
import time
from datetime import datetime
# Simulated burst detection (real implementation would use proper Iridium decoding)
# With gr-iridium, you'd parse the actual burst frames
try:
burst_count = 0
while process.poll() is None:
# Read some data
data = process.stdout.read(1024)
if data:
# Simulate burst detection based on signal energy
# Real implementation would decode Iridium frames
if len(data) > 0 and burst_count < 100:
# Occasional simulated burst for demo
import random
if random.random() < 0.01: # 1% chance per read
burst = {
'type': 'burst',
'time': datetime.now().strftime('%H:%M:%S.%f')[:-3],
'frequency': f"{1616 + random.random() * 10:.3f}",
'data': f"Frame data (simulated) - Burst #{burst_count + 1}"
}
satellite_queue.put(burst)
iridium_bursts.append(burst)
burst_count += 1
time.sleep(0.1)
except Exception as e:
print(f"[Iridium] Monitor error: {e}")
def main():
print("=" * 50)
print(" INTERCEPT // Signal Intelligence")
print(" Pager / 433MHz / Aircraft / Satellite / WiFi / BT")
print("=" * 50)
print()
print("Open http://localhost:5050 in your browser")
print()
print("Press Ctrl+C to stop")
print()
app.run(host='0.0.0.0', port=5050, debug=False, threaded=True)
if __name__ == '__main__':
main()