#!/usr/bin/env python3
"""
Pager Decoder - POCSAG/FLEX decoder using RTL-SDR and multimon-ng
"""
import subprocess
import shutil
import re
import threading
import queue
import pty
import os
import select
from flask import Flask, render_template_string, jsonify, request, Response, send_file
app = Flask(__name__)
# Global process management
current_process = None
sensor_process = None
wifi_process = None
kismet_process = None
bt_process = None
output_queue = queue.Queue()
sensor_queue = queue.Queue()
wifi_queue = queue.Queue()
bt_queue = queue.Queue()
process_lock = threading.Lock()
sensor_lock = threading.Lock()
wifi_lock = threading.Lock()
bt_lock = threading.Lock()
# Logging settings
logging_enabled = False
log_file_path = 'pager_messages.log'
# WiFi state
wifi_monitor_interface = None
wifi_networks = {} # BSSID -> network info
wifi_clients = {} # Client MAC -> client info
wifi_handshakes = [] # Captured handshakes
# Bluetooth state
bt_interface = None
bt_devices = {} # MAC -> device info
bt_beacons = {} # MAC -> beacon info (AirTags, Tiles, iBeacons)
bt_services = {} # MAC -> list of services
# Known beacon prefixes for detection
AIRTAG_PREFIXES = ['4C:00'] # Apple continuity
TILE_PREFIXES = ['C4:E7', 'DC:54', 'E4:B0', 'F8:8A']
SAMSUNG_TRACKER = ['58:4D', 'A0:75']
# OUI Database for manufacturer lookup (common ones)
OUI_DATABASE = {
'00:00:0A': 'Omron',
'00:1A:7D': 'Cyber-Blue',
'00:1E:3D': 'Alps Electric',
'00:1F:20': 'Logitech',
'00:25:DB': 'Apple',
'04:52:F3': 'Apple',
'0C:3E:9F': 'Apple',
'10:94:BB': 'Apple',
'14:99:E2': 'Apple',
'20:78:F0': 'Apple',
'28:6A:BA': 'Apple',
'3C:22:FB': 'Apple',
'40:98:AD': 'Apple',
'48:D7:05': 'Apple',
'4C:57:CA': 'Apple',
'54:4E:90': 'Apple',
'5C:97:F3': 'Apple',
'60:F8:1D': 'Apple',
'68:DB:CA': 'Apple',
'70:56:81': 'Apple',
'78:7B:8A': 'Apple',
'7C:D1:C3': 'Apple',
'84:FC:FE': 'Apple',
'8C:2D:AA': 'Apple',
'90:B0:ED': 'Apple',
'98:01:A7': 'Apple',
'98:D6:BB': 'Apple',
'A4:D1:D2': 'Apple',
'AC:BC:32': 'Apple',
'B0:34:95': 'Apple',
'B8:C1:11': 'Apple',
'C8:69:CD': 'Apple',
'D0:03:4B': 'Apple',
'DC:A9:04': 'Apple',
'E0:C7:67': 'Apple',
'F0:18:98': 'Apple',
'F4:5C:89': 'Apple',
'00:1B:66': 'Samsung',
'00:21:19': 'Samsung',
'00:26:37': 'Samsung',
'5C:0A:5B': 'Samsung',
'8C:71:F8': 'Samsung',
'C4:73:1E': 'Samsung',
'38:2C:4A': 'Samsung',
'00:1E:4C': 'Samsung',
'64:B5:C6': 'Liteon/Google',
'54:60:09': 'Google',
'00:1A:11': 'Google',
'F4:F5:D8': 'Google',
'94:EB:2C': 'Google',
'78:4F:43': 'Apple',
'F8:E4:E3': 'Tile',
'C4:E7:BE': 'Tile',
'E0:E5:CF': 'Raspberry Pi',
'B8:27:EB': 'Raspberry Pi',
'DC:A6:32': 'Raspberry Pi',
'00:0B:57': 'Silicon Wave', # BT Chips
'00:02:72': 'CC&C', # BT dongles
}
HTML_TEMPLATE = '''
INTERCEPT // Signal Intelligence
⚠️
DISCLAIMER
INTERCEPT is a signal intelligence tool designed for educational purposes only .
By using this software, you acknowledge and agree that:
This tool is intended for use by cyber security professionals and researchers only
You will only use this software in a controlled environment with proper authorization
Intercepting communications without consent may be illegal in your jurisdiction
You are solely responsible for ensuring compliance with all applicable laws and regulations
The developers assume no liability for misuse of this software
Only proceed if you understand and accept these terms.
I UNDERSTAND & ACCEPT
DECLINE
█████╗ ██████╗ ██████╗███████╗███████╗███████╗
██╔══██╗██╔════╝██╔════╝██╔════╝██╔════╝██╔════╝
███████║██║ ██║ █████╗ ███████╗███████╗
██╔══██║██║ ██║ ██╔══╝ ╚════██║╚════██║
██║ ██║╚██████╗╚██████╗███████╗███████║███████║
╚═╝ ╚═╝ ╚═════╝ ╚═════╝╚══════╝╚══════╝╚══════╝
██████╗ ███████╗███╗ ██╗██╗███████╗██████╗
██╔══██╗██╔════╝████╗ ██║██║██╔════╝██╔══██╗
██║ ██║█████╗ ██╔██╗ ██║██║█████╗ ██║ ██║
██║ ██║██╔══╝ ██║╚██╗██║██║██╔══╝ ██║ ██║
██████╔╝███████╗██║ ╚████║██║███████╗██████╔╝
╚═════╝ ╚══════╝╚═╝ ╚═══╝╚═╝╚══════╝╚═════╝
root@intercepted: ~# sudo access --grant-permission
[sudo] password for user: ********
Error: User is not in the sudoers file.
This incident will be reported.
"In a world of locked doors, the man with the key is king.
And you, my friend, just threw away the key."
TRY AGAIN
Channel Utilization (2.4 GHz)
Target Signal
No target selected
-- dBm
Bluetooth Proximity Radar
Tracker Detection
Monitoring for AirTags, Tiles, and other trackers...
Device intelligence data will appear here as signals are intercepted.
Configure settings and click "Start Decoding" to begin.
RECON
🔊 MUTE
⬇ AUTO-SCROLL ON
📄 CSV
📋 JSON
🔍 INTEL
Clear
'''
def check_tool(name):
"""Check if a tool is installed."""
return shutil.which(name) is not None
def detect_devices():
"""Detect RTL-SDR devices."""
devices = []
if not check_tool('rtl_test'):
return devices
try:
result = subprocess.run(
['rtl_test', '-t'],
capture_output=True,
text=True,
timeout=5
)
output = result.stderr + result.stdout
# Parse device info
device_pattern = r'(\d+):\s+(.+?)(?:,\s*SN:\s*(\S+))?$'
for line in output.split('\n'):
line = line.strip()
match = re.match(device_pattern, line)
if match:
devices.append({
'index': int(match.group(1)),
'name': match.group(2).strip().rstrip(','),
'serial': match.group(3) or 'N/A'
})
if not devices:
found_match = re.search(r'Found (\d+) device', output)
if found_match:
count = int(found_match.group(1))
for i in range(count):
devices.append({
'index': i,
'name': f'RTL-SDR Device {i}',
'serial': 'Unknown'
})
except Exception:
pass
return devices
def parse_multimon_output(line):
"""Parse multimon-ng output line."""
# POCSAG formats:
# POCSAG512: Address: 1234567 Function: 0 Alpha: Message here
# POCSAG1200: Address: 1234567 Function: 0 Numeric: 123-456-7890
# POCSAG2400: Address: 1234567 Function: 0 (no message)
# FLEX formats:
# FLEX: NNNN-NN-NN NN:NN:NN NNNN/NN/C NN.NNN [NNNNNNN] ALN Message here
# FLEX|NNNN-NN-NN|NN:NN:NN|NNNN/NN/C|NN.NNN|NNNNNNN|ALN|Message
line = line.strip()
# POCSAG parsing - with message content
pocsag_match = re.match(
r'(POCSAG\d+):\s*Address:\s*(\d+)\s+Function:\s*(\d+)\s+(Alpha|Numeric):\s*(.*)',
line
)
if pocsag_match:
return {
'protocol': pocsag_match.group(1),
'address': pocsag_match.group(2),
'function': pocsag_match.group(3),
'msg_type': pocsag_match.group(4),
'message': pocsag_match.group(5).strip() or '[No Message]'
}
# POCSAG parsing - address only (no message content)
pocsag_addr_match = re.match(
r'(POCSAG\d+):\s*Address:\s*(\d+)\s+Function:\s*(\d+)\s*$',
line
)
if pocsag_addr_match:
return {
'protocol': pocsag_addr_match.group(1),
'address': pocsag_addr_match.group(2),
'function': pocsag_addr_match.group(3),
'msg_type': 'Tone',
'message': '[Tone Only]'
}
# FLEX parsing (standard format)
flex_match = re.match(
r'FLEX[:\|]\s*[\d\-]+[\s\|]+[\d:]+[\s\|]+([\d/A-Z]+)[\s\|]+([\d.]+)[\s\|]+\[?(\d+)\]?[\s\|]+(\w+)[\s\|]+(.*)',
line
)
if flex_match:
return {
'protocol': 'FLEX',
'address': flex_match.group(3),
'function': flex_match.group(1),
'msg_type': flex_match.group(4),
'message': flex_match.group(5).strip() or '[No Message]'
}
# Simple FLEX format
flex_simple = re.match(r'FLEX:\s*(.+)', line)
if flex_simple:
return {
'protocol': 'FLEX',
'address': 'Unknown',
'function': '',
'msg_type': 'Unknown',
'message': flex_simple.group(1).strip()
}
return None
def stream_decoder(master_fd, process):
"""Stream decoder output to queue using PTY for unbuffered output."""
global current_process
try:
output_queue.put({'type': 'status', 'text': 'started'})
buffer = ""
while True:
try:
ready, _, _ = select.select([master_fd], [], [], 1.0)
except Exception:
break
if ready:
try:
data = os.read(master_fd, 1024)
if not data:
break
buffer += data.decode('utf-8', errors='replace')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line:
continue
parsed = parse_multimon_output(line)
if parsed:
from datetime import datetime
parsed['timestamp'] = datetime.now().strftime('%H:%M:%S')
output_queue.put({'type': 'message', **parsed})
log_message(parsed)
else:
output_queue.put({'type': 'raw', 'text': line})
except OSError:
break
if process.poll() is not None:
break
except Exception as e:
output_queue.put({'type': 'error', 'text': str(e)})
finally:
try:
os.close(master_fd)
except:
pass
process.wait()
output_queue.put({'type': 'status', 'text': 'stopped'})
with process_lock:
current_process = None
@app.route('/')
def index():
tools = {
'rtl_fm': check_tool('rtl_fm'),
'multimon': check_tool('multimon-ng'),
'rtl_433': check_tool('rtl_433')
}
devices = detect_devices()
return render_template_string(HTML_TEMPLATE, tools=tools, devices=devices)
@app.route('/favicon.svg')
def favicon():
return send_file('favicon.svg', mimetype='image/svg+xml')
@app.route('/devices')
def get_devices():
return jsonify(detect_devices())
@app.route('/start', methods=['POST'])
def start_decoding():
global current_process
with process_lock:
if current_process:
return jsonify({'status': 'error', 'message': 'Already running'})
data = request.json
freq = data.get('frequency', '929.6125')
gain = data.get('gain', '0')
squelch = data.get('squelch', '0')
ppm = data.get('ppm', '0')
device = data.get('device', '0')
protocols = data.get('protocols', ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX'])
# Clear queue
while not output_queue.empty():
try:
output_queue.get_nowait()
except:
break
# Build multimon-ng decoder arguments
decoders = []
for proto in protocols:
if proto == 'POCSAG512':
decoders.extend(['-a', 'POCSAG512'])
elif proto == 'POCSAG1200':
decoders.extend(['-a', 'POCSAG1200'])
elif proto == 'POCSAG2400':
decoders.extend(['-a', 'POCSAG2400'])
elif proto == 'FLEX':
decoders.extend(['-a', 'FLEX'])
# Build rtl_fm command
# rtl_fm -d -f M -M fm -s 22050 -g -p -l - | multimon-ng -t raw -a POCSAG512 -a POCSAG1200 -a FLEX -f alpha -
rtl_cmd = [
'rtl_fm',
'-d', str(device),
'-f', f'{freq}M',
'-M', 'fm',
'-s', '22050',
]
if gain and gain != '0':
rtl_cmd.extend(['-g', str(gain)])
if ppm and ppm != '0':
rtl_cmd.extend(['-p', str(ppm)])
if squelch and squelch != '0':
rtl_cmd.extend(['-l', str(squelch)])
rtl_cmd.append('-')
multimon_cmd = ['multimon-ng', '-t', 'raw'] + decoders + ['-f', 'alpha', '-']
# Log the command being run
full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd)
print(f"Running: {full_cmd}")
try:
# Create pipe: rtl_fm | multimon-ng
# Use PTY for multimon-ng to get unbuffered output
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Start a thread to monitor rtl_fm stderr for errors
def monitor_rtl_stderr():
for line in rtl_process.stderr:
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
print(f"[RTL_FM] {err_text}", flush=True)
output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'})
rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr)
rtl_stderr_thread.daemon = True
rtl_stderr_thread.start()
# Create a pseudo-terminal for multimon-ng output
# This tricks it into thinking it's connected to a terminal,
# which disables output buffering
master_fd, slave_fd = pty.openpty()
multimon_process = subprocess.Popen(
multimon_cmd,
stdin=rtl_process.stdout,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd) # Close slave fd in parent process
rtl_process.stdout.close() # Allow rtl_process to receive SIGPIPE
current_process = multimon_process
current_process._rtl_process = rtl_process # Store reference to kill later
current_process._master_fd = master_fd # Store for cleanup
# Start output thread with PTY master fd
thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process))
thread.daemon = True
thread.start()
# Send the command info to the client
output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError as e:
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/stop', methods=['POST'])
def stop_decoding():
global current_process
with process_lock:
if current_process:
# Kill rtl_fm process first
if hasattr(current_process, '_rtl_process'):
try:
current_process._rtl_process.terminate()
current_process._rtl_process.wait(timeout=2)
except:
try:
current_process._rtl_process.kill()
except:
pass
# Close PTY master fd
if hasattr(current_process, '_master_fd'):
try:
os.close(current_process._master_fd)
except:
pass
# Kill multimon-ng
current_process.terminate()
try:
current_process.wait(timeout=2)
except subprocess.TimeoutExpired:
current_process.kill()
current_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/status')
def get_status():
"""Check if decoder is currently running."""
with process_lock:
if current_process and current_process.poll() is None:
return jsonify({'running': True, 'logging': logging_enabled, 'log_file': log_file_path})
return jsonify({'running': False, 'logging': logging_enabled, 'log_file': log_file_path})
@app.route('/logging', methods=['POST'])
def toggle_logging():
"""Toggle message logging."""
global logging_enabled, log_file_path
data = request.json
if 'enabled' in data:
logging_enabled = data['enabled']
if 'log_file' in data and data['log_file']:
log_file_path = data['log_file']
return jsonify({'logging': logging_enabled, 'log_file': log_file_path})
def log_message(msg):
"""Log a message to file if logging is enabled."""
if not logging_enabled:
return
try:
with open(log_file_path, 'a') as f:
from datetime import datetime
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {msg.get('protocol', 'UNKNOWN')} | {msg.get('address', '')} | {msg.get('message', '')}\n")
except Exception as e:
print(f"[ERROR] Failed to log message: {e}", flush=True)
@app.route('/killall', methods=['POST'])
def kill_all():
"""Kill all decoder and WiFi processes."""
global current_process, sensor_process, wifi_process, kismet_process
killed = []
processes_to_kill = [
'rtl_fm', 'multimon-ng', 'rtl_433',
'airodump-ng', 'aireplay-ng', 'airmon-ng', 'kismet'
]
for proc in processes_to_kill:
try:
result = subprocess.run(['pkill', '-f', proc], capture_output=True)
if result.returncode == 0:
killed.append(proc)
except:
pass
with process_lock:
current_process = None
with sensor_lock:
sensor_process = None
with wifi_lock:
wifi_process = None
kismet_process = None
return jsonify({'status': 'killed', 'processes': killed})
@app.route('/stream')
def stream():
def generate():
import json
while True:
try:
msg = output_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============== RTL_433 SENSOR ROUTES ==============
def stream_sensor_output(process):
"""Stream rtl_433 JSON output to queue."""
global sensor_process
import json as json_module
try:
sensor_queue.put({'type': 'status', 'text': 'started'})
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line:
continue
try:
# rtl_433 outputs JSON objects, one per line
data = json_module.loads(line)
data['type'] = 'sensor'
sensor_queue.put(data)
# Log if enabled
if logging_enabled:
try:
with open(log_file_path, 'a') as f:
from datetime import datetime
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{timestamp} | {data.get('model', 'Unknown')} | {json_module.dumps(data)}\n")
except Exception:
pass
except json_module.JSONDecodeError:
# Not JSON, send as raw
sensor_queue.put({'type': 'raw', 'text': line})
except Exception as e:
sensor_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
sensor_queue.put({'type': 'status', 'text': 'stopped'})
with sensor_lock:
sensor_process = None
@app.route('/start_sensor', methods=['POST'])
def start_sensor():
global sensor_process
with sensor_lock:
if sensor_process:
return jsonify({'status': 'error', 'message': 'Sensor already running'})
data = request.json
freq = data.get('frequency', '433.92')
gain = data.get('gain', '0')
ppm = data.get('ppm', '0')
device = data.get('device', '0')
# Clear queue
while not sensor_queue.empty():
try:
sensor_queue.get_nowait()
except:
break
# Build rtl_433 command
# rtl_433 -d -f M -g -p -F json
cmd = [
'rtl_433',
'-d', str(device),
'-f', f'{freq}M',
'-F', 'json'
]
if gain and gain != '0':
cmd.extend(['-g', str(gain)])
if ppm and ppm != '0':
cmd.extend(['-p', str(ppm)])
full_cmd = ' '.join(cmd)
print(f"Running: {full_cmd}")
try:
sensor_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1
)
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(sensor_process,))
thread.daemon = True
thread.start()
# Monitor stderr
def monitor_stderr():
for line in sensor_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err:
print(f"[rtl_433] {err}")
sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
stderr_thread = threading.Thread(target=monitor_stderr)
stderr_thread.daemon = True
stderr_thread.start()
sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/stop_sensor', methods=['POST'])
def stop_sensor():
global sensor_process
with sensor_lock:
if sensor_process:
sensor_process.terminate()
try:
sensor_process.wait(timeout=2)
except subprocess.TimeoutExpired:
sensor_process.kill()
sensor_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/stream_sensor')
def stream_sensor():
def generate():
import json
while True:
try:
msg = sensor_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============== WIFI RECONNAISSANCE ROUTES ==============
def detect_wifi_interfaces():
"""Detect available WiFi interfaces."""
interfaces = []
import platform
if platform.system() == 'Darwin': # macOS
try:
# Get list of network interfaces
result = subprocess.run(['networksetup', '-listallhardwareports'],
capture_output=True, text=True, timeout=5)
lines = result.stdout.split('\n')
current_device = None
for i, line in enumerate(lines):
if 'Wi-Fi' in line or 'AirPort' in line:
# Next line should have the device
for j in range(i+1, min(i+3, len(lines))):
if 'Device:' in lines[j]:
device = lines[j].split('Device:')[1].strip()
interfaces.append({
'name': device,
'type': 'internal',
'monitor_capable': False, # macOS internal usually can't
'status': 'up'
})
break
except Exception as e:
print(f"[WiFi] Error detecting macOS interfaces: {e}")
# Check for USB WiFi adapters
try:
result = subprocess.run(['system_profiler', 'SPUSBDataType'],
capture_output=True, text=True, timeout=10)
if 'Wireless' in result.stdout or 'WLAN' in result.stdout or '802.11' in result.stdout:
interfaces.append({
'name': 'USB WiFi Adapter',
'type': 'usb',
'monitor_capable': True,
'status': 'detected'
})
except Exception:
pass
else: # Linux
try:
# Use iw to list wireless interfaces
result = subprocess.run(['iw', 'dev'], capture_output=True, text=True, timeout=5)
current_iface = None
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('Interface'):
current_iface = line.split()[1]
elif current_iface and 'type' in line:
iface_type = line.split()[-1]
interfaces.append({
'name': current_iface,
'type': iface_type,
'monitor_capable': True,
'status': 'up'
})
current_iface = None
except FileNotFoundError:
# Try iwconfig instead
try:
result = subprocess.run(['iwconfig'], capture_output=True, text=True, timeout=5)
for line in result.stdout.split('\n'):
if 'IEEE 802.11' in line:
iface = line.split()[0]
interfaces.append({
'name': iface,
'type': 'managed',
'monitor_capable': True,
'status': 'up'
})
except Exception:
pass
except Exception as e:
print(f"[WiFi] Error detecting Linux interfaces: {e}")
return interfaces
@app.route('/wifi/interfaces')
def get_wifi_interfaces():
"""Get available WiFi interfaces."""
interfaces = detect_wifi_interfaces()
tools = {
'airmon': check_tool('airmon-ng'),
'airodump': check_tool('airodump-ng'),
'aireplay': check_tool('aireplay-ng'),
'kismet': check_tool('kismet'),
'iw': check_tool('iw')
}
return jsonify({'interfaces': interfaces, 'tools': tools, 'monitor_interface': wifi_monitor_interface})
@app.route('/wifi/monitor', methods=['POST'])
def toggle_monitor_mode():
"""Enable or disable monitor mode on an interface."""
global wifi_monitor_interface
data = request.json
interface = data.get('interface')
action = data.get('action', 'start') # 'start' or 'stop'
if not interface:
return jsonify({'status': 'error', 'message': 'No interface specified'})
if action == 'start':
# Try airmon-ng first
if check_tool('airmon-ng'):
try:
# Kill interfering processes
subprocess.run(['airmon-ng', 'check', 'kill'], capture_output=True, timeout=10)
# Start monitor mode
result = subprocess.run(['airmon-ng', 'start', interface],
capture_output=True, text=True, timeout=15)
# Parse output to find monitor interface name
output = result.stdout + result.stderr
# Common patterns: wlan0mon, wlan0, mon0
import re
match = re.search(r'monitor mode.*?enabled.*?(\w+mon|\w+)', output, re.IGNORECASE)
if match:
wifi_monitor_interface = match.group(1)
else:
# Assume it's interface + 'mon'
wifi_monitor_interface = interface + 'mon'
wifi_queue.put({'type': 'info', 'text': f'Monitor mode enabled on {wifi_monitor_interface}'})
return jsonify({'status': 'success', 'monitor_interface': wifi_monitor_interface})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
# Fallback to iw (Linux)
elif check_tool('iw'):
try:
subprocess.run(['ip', 'link', 'set', interface, 'down'], capture_output=True)
subprocess.run(['iw', interface, 'set', 'monitor', 'control'], capture_output=True)
subprocess.run(['ip', 'link', 'set', interface, 'up'], capture_output=True)
wifi_monitor_interface = interface
return jsonify({'status': 'success', 'monitor_interface': interface})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
else:
return jsonify({'status': 'error', 'message': 'No monitor mode tools available (need airmon-ng or iw)'})
else: # stop
if check_tool('airmon-ng'):
try:
result = subprocess.run(['airmon-ng', 'stop', wifi_monitor_interface or interface],
capture_output=True, text=True, timeout=15)
wifi_monitor_interface = None
return jsonify({'status': 'success', 'message': 'Monitor mode disabled'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
elif check_tool('iw'):
try:
subprocess.run(['ip', 'link', 'set', interface, 'down'], capture_output=True)
subprocess.run(['iw', interface, 'set', 'type', 'managed'], capture_output=True)
subprocess.run(['ip', 'link', 'set', interface, 'up'], capture_output=True)
wifi_monitor_interface = None
return jsonify({'status': 'success', 'message': 'Monitor mode disabled'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
return jsonify({'status': 'error', 'message': 'Unknown action'})
def parse_airodump_csv(csv_path):
"""Parse airodump-ng CSV output file."""
networks = {}
clients = {}
try:
with open(csv_path, 'r', errors='replace') as f:
content = f.read()
# Split into networks and clients sections
sections = content.split('\n\n')
for section in sections:
lines = section.strip().split('\n')
if not lines:
continue
header = lines[0] if lines else ''
if 'BSSID' in header and 'ESSID' in header:
# Networks section
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 14:
bssid = parts[0]
if bssid and ':' in bssid:
networks[bssid] = {
'bssid': bssid,
'first_seen': parts[1],
'last_seen': parts[2],
'channel': parts[3],
'speed': parts[4],
'privacy': parts[5],
'cipher': parts[6],
'auth': parts[7],
'power': parts[8],
'beacons': parts[9],
'ivs': parts[10],
'lan_ip': parts[11],
'essid': parts[13] if len(parts) > 13 else 'Hidden'
}
elif 'Station MAC' in header:
# Clients section
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 6:
station = parts[0]
if station and ':' in station:
clients[station] = {
'mac': station,
'first_seen': parts[1],
'last_seen': parts[2],
'power': parts[3],
'packets': parts[4],
'bssid': parts[5],
'probes': parts[6] if len(parts) > 6 else ''
}
except Exception as e:
print(f"[WiFi] Error parsing CSV: {e}")
return networks, clients
def stream_airodump_output(process, csv_path):
"""Stream airodump-ng output to queue."""
global wifi_process, wifi_networks, wifi_clients
import time
try:
wifi_queue.put({'type': 'status', 'text': 'started'})
last_parse = 0
while process.poll() is None:
# Parse CSV file periodically
current_time = time.time()
if current_time - last_parse >= 2: # Parse every 2 seconds
if os.path.exists(csv_path + '-01.csv'):
networks, clients = parse_airodump_csv(csv_path + '-01.csv')
# Detect new networks
for bssid, net in networks.items():
if bssid not in wifi_networks:
wifi_queue.put({
'type': 'network',
'action': 'new',
**net
})
else:
# Update existing
wifi_queue.put({
'type': 'network',
'action': 'update',
**net
})
# Detect new clients
for mac, client in clients.items():
if mac not in wifi_clients:
wifi_queue.put({
'type': 'client',
'action': 'new',
**client
})
wifi_networks = networks
wifi_clients = clients
last_parse = current_time
time.sleep(0.5)
except Exception as e:
wifi_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
wifi_queue.put({'type': 'status', 'text': 'stopped'})
with wifi_lock:
wifi_process = None
@app.route('/wifi/scan/start', methods=['POST'])
def start_wifi_scan():
"""Start WiFi scanning with airodump-ng."""
global wifi_process, wifi_networks, wifi_clients
with wifi_lock:
if wifi_process:
return jsonify({'status': 'error', 'message': 'Scan already running'})
data = request.json
interface = data.get('interface') or wifi_monitor_interface
channel = data.get('channel') # None = channel hopping
band = data.get('band', 'abg') # 'a' = 5GHz, 'bg' = 2.4GHz, 'abg' = both
if not interface:
return jsonify({'status': 'error', 'message': 'No monitor interface available. Enable monitor mode first.'})
# Clear previous data
wifi_networks = {}
wifi_clients = {}
# Clear queue
while not wifi_queue.empty():
try:
wifi_queue.get_nowait()
except:
break
# Build airodump-ng command
csv_path = '/tmp/intercept_wifi'
# Remove old files
for f in [f'/tmp/intercept_wifi-01.csv', f'/tmp/intercept_wifi-01.cap',
f'/tmp/intercept_wifi-01.kismet.csv', f'/tmp/intercept_wifi-01.kismet.netxml']:
try:
os.remove(f)
except:
pass
cmd = [
'airodump-ng',
'-w', csv_path,
'--output-format', 'csv,pcap',
'--band', band,
interface
]
if channel:
cmd.extend(['-c', str(channel)])
print(f"[WiFi] Running: {' '.join(cmd)}")
try:
wifi_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Start parsing thread
thread = threading.Thread(target=stream_airodump_output, args=(wifi_process, csv_path))
thread.daemon = True
thread.start()
wifi_queue.put({'type': 'info', 'text': f'Started scanning on {interface}'})
return jsonify({'status': 'started', 'interface': interface})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'airodump-ng not found. Install aircrack-ng suite.'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/scan/stop', methods=['POST'])
def stop_wifi_scan():
"""Stop WiFi scanning."""
global wifi_process
with wifi_lock:
if wifi_process:
wifi_process.terminate()
try:
wifi_process.wait(timeout=3)
except subprocess.TimeoutExpired:
wifi_process.kill()
wifi_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/wifi/deauth', methods=['POST'])
def send_deauth():
"""Send deauthentication packets to force handshake capture."""
data = request.json
target_bssid = data.get('bssid')
target_client = data.get('client', 'FF:FF:FF:FF:FF:FF') # Broadcast by default
count = data.get('count', 5)
interface = data.get('interface') or wifi_monitor_interface
if not target_bssid:
return jsonify({'status': 'error', 'message': 'Target BSSID required'})
if not interface:
return jsonify({'status': 'error', 'message': 'No monitor interface'})
if not check_tool('aireplay-ng'):
return jsonify({'status': 'error', 'message': 'aireplay-ng not found'})
try:
# aireplay-ng --deauth -a -c
cmd = [
'aireplay-ng',
'--deauth', str(count),
'-a', target_bssid,
'-c', target_client,
interface
]
wifi_queue.put({'type': 'info', 'text': f'Sending {count} deauth packets to {target_bssid}'})
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return jsonify({'status': 'success', 'message': f'Sent {count} deauth packets'})
else:
return jsonify({'status': 'error', 'message': result.stderr})
except subprocess.TimeoutExpired:
return jsonify({'status': 'success', 'message': 'Deauth sent (timed out waiting for completion)'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/handshake/capture', methods=['POST'])
def capture_handshake():
"""Start targeted handshake capture."""
global wifi_process
data = request.json
target_bssid = data.get('bssid')
channel = data.get('channel')
interface = data.get('interface') or wifi_monitor_interface
if not target_bssid or not channel:
return jsonify({'status': 'error', 'message': 'BSSID and channel required'})
with wifi_lock:
if wifi_process:
return jsonify({'status': 'error', 'message': 'Scan already running. Stop it first.'})
capture_path = f'/tmp/intercept_handshake_{target_bssid.replace(":", "")}'
cmd = [
'airodump-ng',
'-c', str(channel),
'--bssid', target_bssid,
'-w', capture_path,
'--output-format', 'pcap',
interface
]
try:
wifi_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
wifi_queue.put({'type': 'info', 'text': f'Capturing handshakes for {target_bssid} on channel {channel}'})
return jsonify({'status': 'started', 'capture_file': capture_path + '-01.cap'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/kismet/start', methods=['POST'])
def start_kismet():
"""Start Kismet for passive reconnaissance."""
global kismet_process
data = request.json
interface = data.get('interface') or wifi_monitor_interface
if not interface:
return jsonify({'status': 'error', 'message': 'No interface specified'})
if not check_tool('kismet'):
return jsonify({'status': 'error', 'message': 'Kismet not found. Install with: brew install kismet'})
with wifi_lock:
if kismet_process:
return jsonify({'status': 'error', 'message': 'Kismet already running'})
try:
# Start Kismet with REST API enabled
cmd = [
'kismet',
'-c', interface,
'--no-ncurses',
'--override', 'httpd_bind_address=127.0.0.1',
'--override', 'httpd_port=2501'
]
kismet_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
wifi_queue.put({'type': 'info', 'text': 'Kismet started. API available at http://127.0.0.1:2501'})
return jsonify({'status': 'started', 'api_url': 'http://127.0.0.1:2501'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/kismet/stop', methods=['POST'])
def stop_kismet():
"""Stop Kismet."""
global kismet_process
with wifi_lock:
if kismet_process:
kismet_process.terminate()
try:
kismet_process.wait(timeout=5)
except subprocess.TimeoutExpired:
kismet_process.kill()
kismet_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/wifi/kismet/devices')
def get_kismet_devices():
"""Get devices from Kismet REST API."""
import urllib.request
import json as json_module
try:
# Kismet REST API endpoint for devices
url = 'http://127.0.0.1:2501/devices/views/all/devices.json'
req = urllib.request.Request(url)
req.add_header('KISMET', 'admin:admin') # Default credentials
with urllib.request.urlopen(req, timeout=5) as response:
data = json_module.loads(response.read().decode())
return jsonify({'devices': data})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/wifi/networks')
def get_wifi_networks():
"""Get current list of discovered networks."""
return jsonify({
'networks': list(wifi_networks.values()),
'clients': list(wifi_clients.values()),
'handshakes': wifi_handshakes,
'monitor_interface': wifi_monitor_interface
})
@app.route('/wifi/stream')
def stream_wifi():
"""SSE stream for WiFi events."""
def generate():
import json
while True:
try:
msg = wifi_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
# ============== BLUETOOTH RECONNAISSANCE ROUTES ==============
def get_manufacturer(mac):
"""Look up manufacturer from MAC address OUI."""
prefix = mac[:8].upper()
return OUI_DATABASE.get(prefix, 'Unknown')
def classify_bt_device(name, device_class, services):
"""Classify Bluetooth device type based on available info."""
name_lower = (name or '').lower()
# Check name for common patterns
if any(x in name_lower for x in ['airpod', 'earbud', 'headphone', 'speaker', 'audio', 'beats', 'bose', 'jbl', 'sony wh', 'sony wf']):
return 'audio'
if any(x in name_lower for x in ['watch', 'band', 'fitbit', 'garmin', 'mi band']):
return 'wearable'
if any(x in name_lower for x in ['iphone', 'galaxy', 'pixel', 'phone', 'android']):
return 'phone'
if any(x in name_lower for x in ['airtag', 'tile', 'smarttag', 'tracker', 'chipolo']):
return 'tracker'
if any(x in name_lower for x in ['keyboard', 'mouse', 'controller', 'gamepad']):
return 'input'
if any(x in name_lower for x in ['tv', 'roku', 'chromecast', 'firestick']):
return 'media'
# Check device class if available
if device_class:
major_class = (device_class >> 8) & 0x1F
if major_class == 1: # Computer
return 'computer'
elif major_class == 2: # Phone
return 'phone'
elif major_class == 4: # Audio/Video
return 'audio'
elif major_class == 5: # Peripheral
return 'input'
elif major_class == 6: # Imaging
return 'imaging'
elif major_class == 7: # Wearable
return 'wearable'
return 'other'
def detect_tracker(mac, name, manufacturer_data=None):
"""Detect if device is a known tracker (AirTag, Tile, etc)."""
mac_prefix = mac[:5].upper()
# AirTag detection (Apple Find My)
if any(mac_prefix.startswith(p) for p in AIRTAG_PREFIXES):
if manufacturer_data and b'\\x4c\\x00' in manufacturer_data:
return {'type': 'airtag', 'name': 'Apple AirTag', 'risk': 'high'}
# Tile detection
if any(mac_prefix.startswith(p) for p in TILE_PREFIXES):
return {'type': 'tile', 'name': 'Tile Tracker', 'risk': 'medium'}
# Samsung SmartTag
if any(mac_prefix.startswith(p) for p in SAMSUNG_TRACKER):
return {'type': 'smarttag', 'name': 'Samsung SmartTag', 'risk': 'medium'}
# Name-based detection
name_lower = (name or '').lower()
if 'airtag' in name_lower:
return {'type': 'airtag', 'name': 'Apple AirTag', 'risk': 'high'}
if 'tile' in name_lower:
return {'type': 'tile', 'name': 'Tile Tracker', 'risk': 'medium'}
if 'smarttag' in name_lower:
return {'type': 'smarttag', 'name': 'Samsung SmartTag', 'risk': 'medium'}
if 'chipolo' in name_lower:
return {'type': 'chipolo', 'name': 'Chipolo Tracker', 'risk': 'medium'}
return None
def detect_bt_interfaces():
"""Detect available Bluetooth interfaces."""
interfaces = []
import platform
if platform.system() == 'Linux':
try:
# Use hciconfig to list interfaces
result = subprocess.run(['hciconfig'], capture_output=True, text=True, timeout=5)
lines = result.stdout.split('\n')
current_iface = None
for line in lines:
if line and not line.startswith('\t') and not line.startswith(' '):
parts = line.split(':')
if parts:
current_iface = parts[0].strip()
interfaces.append({
'name': current_iface,
'type': 'hci',
'status': 'up' if 'UP' in line else 'down'
})
except FileNotFoundError:
pass
except Exception as e:
print(f"[BT] Error detecting interfaces: {e}")
elif platform.system() == 'Darwin': # macOS
# macOS uses different Bluetooth stack
interfaces.append({
'name': 'default',
'type': 'macos',
'status': 'available'
})
# Check for Ubertooth
try:
result = subprocess.run(['ubertooth-util', '-v'], capture_output=True, timeout=5)
if result.returncode == 0:
interfaces.append({
'name': 'ubertooth0',
'type': 'ubertooth',
'status': 'connected'
})
except:
pass
return interfaces
@app.route('/bt/interfaces')
def get_bt_interfaces():
"""Get available Bluetooth interfaces and tools."""
interfaces = detect_bt_interfaces()
tools = {
'hcitool': check_tool('hcitool'),
'bluetoothctl': check_tool('bluetoothctl'),
'ubertooth': check_tool('ubertooth-scan'),
'bettercap': check_tool('bettercap'),
'hciconfig': check_tool('hciconfig'),
'l2ping': check_tool('l2ping'),
'sdptool': check_tool('sdptool')
}
return jsonify({
'interfaces': interfaces,
'tools': tools,
'current_interface': bt_interface
})
def parse_hcitool_output(line):
"""Parse hcitool scan output line."""
# Format: "AA:BB:CC:DD:EE:FF Device Name"
parts = line.strip().split('\t')
if len(parts) >= 2:
mac = parts[0].strip()
name = parts[1].strip() if len(parts) > 1 else ''
if ':' in mac and len(mac) == 17:
return {'mac': mac, 'name': name}
return None
def stream_bt_scan(process, scan_mode):
"""Stream Bluetooth scan output to queue."""
global bt_process, bt_devices
import time
try:
bt_queue.put({'type': 'status', 'text': 'started'})
if scan_mode == 'hcitool':
# hcitool lescan output
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line or 'LE Scan' in line:
continue
# Parse BLE device
parts = line.split()
if len(parts) >= 1 and ':' in parts[0]:
mac = parts[0]
name = ' '.join(parts[1:]) if len(parts) > 1 else ''
device = {
'mac': mac,
'name': name or '[Unknown]',
'manufacturer': get_manufacturer(mac),
'type': classify_bt_device(name, None, None),
'rssi': None,
'last_seen': time.time()
}
# Check for tracker
tracker = detect_tracker(mac, name)
if tracker:
device['tracker'] = tracker
is_new = mac not in bt_devices
bt_devices[mac] = device
bt_queue.put({
'type': 'device',
'action': 'new' if is_new else 'update',
**device
})
elif scan_mode == 'bluetoothctl':
# bluetoothctl scan output
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
# Parse [NEW] Device or [CHG] Device lines
if 'Device' in line and ':' in line:
import re
match = re.search(r'([0-9A-Fa-f:]{17})\s*(.*)', line)
if match:
mac = match.group(1)
name = match.group(2).strip()
device = {
'mac': mac,
'name': name or '[Unknown]',
'manufacturer': get_manufacturer(mac),
'type': classify_bt_device(name, None, None),
'rssi': None,
'last_seen': time.time()
}
tracker = detect_tracker(mac, name)
if tracker:
device['tracker'] = tracker
is_new = mac not in bt_devices
bt_devices[mac] = device
bt_queue.put({
'type': 'device',
'action': 'new' if is_new else 'update',
**device
})
except Exception as e:
bt_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
bt_queue.put({'type': 'status', 'text': 'stopped'})
with bt_lock:
bt_process = None
@app.route('/bt/scan/start', methods=['POST'])
def start_bt_scan():
"""Start Bluetooth scanning."""
global bt_process, bt_devices, bt_interface
with bt_lock:
if bt_process:
return jsonify({'status': 'error', 'message': 'Scan already running'})
data = request.json
scan_mode = data.get('mode', 'hcitool')
interface = data.get('interface', 'hci0')
duration = data.get('duration', 30)
scan_ble = data.get('scan_ble', True)
scan_classic = data.get('scan_classic', True)
bt_interface = interface
bt_devices = {}
# Clear queue
while not bt_queue.empty():
try:
bt_queue.get_nowait()
except:
break
try:
if scan_mode == 'hcitool':
if scan_ble:
cmd = ['hcitool', '-i', interface, 'lescan', '--duplicates']
else:
cmd = ['hcitool', '-i', interface, 'scan']
bt_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
elif scan_mode == 'bluetoothctl':
# Use bluetoothctl for scanning
cmd = ['bluetoothctl']
bt_process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Send scan on command
bt_process.stdin.write(b'scan on\n')
bt_process.stdin.flush()
elif scan_mode == 'ubertooth':
cmd = ['ubertooth-scan', '-t', str(duration)]
bt_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
elif scan_mode == 'bettercap':
cmd = ['bettercap', '-eval', 'ble.recon on', '-silent']
bt_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
else:
return jsonify({'status': 'error', 'message': f'Unknown scan mode: {scan_mode}'})
# Start streaming thread
thread = threading.Thread(target=stream_bt_scan, args=(bt_process, scan_mode))
thread.daemon = True
thread.start()
bt_queue.put({'type': 'info', 'text': f'Started {scan_mode} scan on {interface}'})
return jsonify({'status': 'started', 'mode': scan_mode, 'interface': interface})
except FileNotFoundError as e:
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/scan/stop', methods=['POST'])
def stop_bt_scan():
"""Stop Bluetooth scanning."""
global bt_process
with bt_lock:
if bt_process:
bt_process.terminate()
try:
bt_process.wait(timeout=3)
except subprocess.TimeoutExpired:
bt_process.kill()
bt_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@app.route('/bt/enum', methods=['POST'])
def enum_bt_services():
"""Enumerate services on a Bluetooth device."""
data = request.json
target_mac = data.get('mac')
if not target_mac:
return jsonify({'status': 'error', 'message': 'Target MAC required'})
try:
# Try sdptool for classic BT
result = subprocess.run(
['sdptool', 'browse', target_mac],
capture_output=True, text=True, timeout=30
)
services = []
current_service = {}
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('Service Name:'):
if current_service:
services.append(current_service)
current_service = {'name': line.split(':', 1)[1].strip()}
elif line.startswith('Service Description:'):
current_service['description'] = line.split(':', 1)[1].strip()
elif line.startswith('Service Provider:'):
current_service['provider'] = line.split(':', 1)[1].strip()
elif 'Protocol Descriptor' in line:
current_service['protocol'] = line
if current_service:
services.append(current_service)
bt_services[target_mac] = services
return jsonify({
'status': 'success',
'mac': target_mac,
'services': services
})
except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Connection timed out'})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'sdptool not found'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/ping', methods=['POST'])
def ping_bt_device():
"""Ping a Bluetooth device using l2ping."""
data = request.json
target_mac = data.get('mac')
count = data.get('count', 5)
if not target_mac:
return jsonify({'status': 'error', 'message': 'Target MAC required'})
try:
result = subprocess.run(
['l2ping', '-c', str(count), target_mac],
capture_output=True, text=True, timeout=30
)
return jsonify({
'status': 'success',
'output': result.stdout,
'reachable': result.returncode == 0
})
except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Ping timed out'})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'l2ping not found'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/dos', methods=['POST'])
def dos_bt_device():
"""Flood ping a Bluetooth device (DoS test)."""
data = request.json
target_mac = data.get('mac')
count = data.get('count', 100)
size = data.get('size', 600)
if not target_mac:
return jsonify({'status': 'error', 'message': 'Target MAC required'})
try:
# l2ping flood with large packets
result = subprocess.run(
['l2ping', '-c', str(count), '-s', str(size), '-f', target_mac],
capture_output=True, text=True, timeout=60
)
bt_queue.put({'type': 'info', 'text': f'DoS test complete on {target_mac}'})
return jsonify({
'status': 'success',
'output': result.stdout
})
except subprocess.TimeoutExpired:
return jsonify({'status': 'success', 'message': 'DoS test timed out (expected)'})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'l2ping not found'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/bt/devices')
def get_bt_devices():
"""Get current list of discovered Bluetooth devices."""
return jsonify({
'devices': list(bt_devices.values()),
'beacons': list(bt_beacons.values()),
'interface': bt_interface
})
@app.route('/bt/stream')
def stream_bt():
"""SSE stream for Bluetooth events."""
def generate():
import json
while True:
try:
msg = bt_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
def main():
print("=" * 50)
print(" INTERCEPT // Signal Intelligence")
print(" POCSAG / FLEX / 433MHz / WiFi / Bluetooth")
print("=" * 50)
print()
print("Open http://localhost:5050 in your browser")
print()
print("Press Ctrl+C to stop")
print()
app.run(host='0.0.0.0', port=5050, debug=False, threaded=True)
if __name__ == '__main__':
main()