Files
intercept/routes/bluetooth.py
James Smith ddeff002c9 Refactor into modular structure with improvements
- Split monolithic intercept.py (15k lines) into modular structure:
  - routes/ - Flask blueprints for each feature
  - templates/ - Jinja2 HTML templates
  - data/ - OUI database, satellite TLEs, detection patterns
  - utils/ - dependencies, process management, logging
  - config.py - centralized configuration with env var support

- Add type hints to function signatures
- Replace bare except clauses with specific exceptions
- Add proper logging module (replaces print statements)
- Add environment variable support (INTERCEPT_* prefix)
- Add test suite with pytest
- Add Dockerfile for containerized deployment
- Add pyproject.toml with ruff/black/mypy config
- Add requirements-dev.txt for development dependencies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-23 16:28:48 +00:00

484 lines
18 KiB
Python

"""Bluetooth reconnaissance routes."""
from __future__ import annotations
import fcntl
import json
import os
import platform
import pty
import queue
import re
import select
import subprocess
import threading
import time
from typing import Any, Generator
from flask import Blueprint, jsonify, request, Response
import app as app_module
from utils.dependencies import check_tool
from data.oui import OUI_DATABASE, load_oui_database, get_manufacturer
from data.patterns import AIRTAG_PREFIXES, TILE_PREFIXES, SAMSUNG_TRACKER
bluetooth_bp = Blueprint('bluetooth', __name__, url_prefix='/bt')
def classify_bt_device(name, device_class, services, manufacturer=None):
"""Classify Bluetooth device type based on available info."""
name_lower = (name or '').lower()
mfr_lower = (manufacturer or '').lower()
audio_patterns = [
'airpod', 'earbud', 'headphone', 'headset', 'speaker', 'audio', 'beats', 'bose',
'jbl', 'sony wh', 'sony wf', 'sennheiser', 'jabra', 'soundcore', 'anker', 'buds',
'earphone', 'pod', 'soundbar', 'skullcandy', 'marshall', 'b&o', 'bang', 'olufsen'
]
if any(x in name_lower for x in audio_patterns):
return 'audio'
wearable_patterns = [
'watch', 'band', 'fitbit', 'garmin', 'mi band', 'miband', 'amazfit',
'galaxy watch', 'gear', 'versa', 'sense', 'charge', 'inspire'
]
if any(x in name_lower for x in wearable_patterns):
return 'wearable'
phone_patterns = [
'iphone', 'galaxy', 'pixel', 'phone', 'android', 'oneplus', 'huawei', 'xiaomi'
]
if any(x in name_lower for x in phone_patterns):
return 'phone'
tracker_patterns = ['airtag', 'tile', 'smarttag', 'chipolo', 'find my']
if any(x in name_lower for x in tracker_patterns):
return 'tracker'
input_patterns = ['keyboard', 'mouse', 'controller', 'gamepad', 'remote']
if any(x in name_lower for x in input_patterns):
return 'input'
if mfr_lower in ['bose', 'jbl', 'sony', 'sennheiser', 'jabra', 'beats']:
return 'audio'
if mfr_lower in ['fitbit', 'garmin']:
return 'wearable'
if mfr_lower == 'tile':
return 'tracker'
if device_class:
major_class = (device_class >> 8) & 0x1F
if major_class == 1:
return 'computer'
elif major_class == 2:
return 'phone'
elif major_class == 4:
return 'audio'
elif major_class == 5:
return 'input'
elif major_class == 7:
return 'wearable'
return 'other'
def detect_tracker(mac, name, manufacturer_data=None):
"""Detect if device is a known tracker."""
mac_prefix = mac[:5].upper()
if any(mac_prefix.startswith(p) for p in AIRTAG_PREFIXES):
if manufacturer_data and b'\\x4c\\x00' in manufacturer_data:
return {'type': 'airtag', 'name': 'Apple AirTag', 'risk': 'high'}
if any(mac_prefix.startswith(p) for p in TILE_PREFIXES):
return {'type': 'tile', 'name': 'Tile Tracker', 'risk': 'medium'}
if any(mac_prefix.startswith(p) for p in SAMSUNG_TRACKER):
return {'type': 'smarttag', 'name': 'Samsung SmartTag', 'risk': 'medium'}
name_lower = (name or '').lower()
if 'airtag' in name_lower:
return {'type': 'airtag', 'name': 'Apple AirTag', 'risk': 'high'}
if 'tile' in name_lower:
return {'type': 'tile', 'name': 'Tile Tracker', 'risk': 'medium'}
return None
def detect_bt_interfaces():
"""Detect available Bluetooth interfaces."""
interfaces = []
if platform.system() == 'Linux':
try:
result = subprocess.run(['hciconfig'], capture_output=True, text=True, timeout=5)
blocks = re.split(r'(?=^hci\d+:)', result.stdout, flags=re.MULTILINE)
for block in blocks:
if block.strip():
first_line = block.split('\n')[0]
match = re.match(r'(hci\d+):', first_line)
if match:
iface_name = match.group(1)
is_up = 'UP RUNNING' in block or '\tUP ' in block
interfaces.append({
'name': iface_name,
'type': 'hci',
'status': 'up' if is_up else 'down'
})
except Exception:
pass
elif platform.system() == 'Darwin':
interfaces.append({
'name': 'default',
'type': 'macos',
'status': 'available'
})
return interfaces
def stream_bt_scan(process, scan_mode):
"""Stream Bluetooth scan output to queue."""
try:
app_module.bt_queue.put({'type': 'status', 'text': 'started'})
if scan_mode == 'hcitool':
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8', errors='replace').strip()
if not line or 'LE Scan' in line:
continue
parts = line.split()
if len(parts) >= 1 and ':' in parts[0]:
mac = parts[0]
name = ' '.join(parts[1:]) if len(parts) > 1 else ''
manufacturer = get_manufacturer(mac)
device = {
'mac': mac,
'name': name or '[Unknown]',
'manufacturer': manufacturer,
'type': classify_bt_device(name, None, None, manufacturer),
'rssi': None,
'last_seen': time.time()
}
tracker = detect_tracker(mac, name)
if tracker:
device['tracker'] = tracker
is_new = mac not in app_module.bt_devices
app_module.bt_devices[mac] = device
app_module.bt_queue.put({
**device,
'type': 'device',
'device_type': device.get('type', 'other'),
'action': 'new' if is_new else 'update',
})
elif scan_mode == 'bluetoothctl':
master_fd = getattr(process, '_master_fd', None)
if not master_fd:
app_module.bt_queue.put({'type': 'error', 'text': 'bluetoothctl pty not available'})
return
buffer = ''
while process.poll() is None:
readable, _, _ = select.select([master_fd], [], [], 1.0)
if readable:
try:
data = os.read(master_fd, 4096)
if not data:
break
buffer += data.decode('utf-8', errors='replace')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
line = re.sub(r'\x1b\[[0-9;]*m', '', line)
line = re.sub(r'\r', '', line)
if 'Device' in line:
match = re.search(r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)', line)
if match:
mac = match.group(1).upper()
name = match.group(2).strip()
manufacturer = get_manufacturer(mac)
device = {
'mac': mac,
'name': name or '[Unknown]',
'manufacturer': manufacturer,
'type': classify_bt_device(name, None, None, manufacturer),
'rssi': None,
'last_seen': time.time()
}
tracker = detect_tracker(mac, name)
if tracker:
device['tracker'] = tracker
is_new = mac not in app_module.bt_devices
app_module.bt_devices[mac] = device
app_module.bt_queue.put({
**device,
'type': 'device',
'device_type': device.get('type', 'other'),
'action': 'new' if is_new else 'update',
})
except OSError:
break
try:
os.close(master_fd)
except OSError:
pass
except Exception as e:
app_module.bt_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
app_module.bt_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.bt_lock:
app_module.bt_process = None
@bluetooth_bp.route('/reload-oui', methods=['POST'])
def reload_oui_database_route():
"""Reload OUI database from external file."""
new_db = load_oui_database()
if new_db:
OUI_DATABASE.clear()
OUI_DATABASE.update(new_db)
return jsonify({'status': 'success', 'entries': len(OUI_DATABASE)})
return jsonify({'status': 'error', 'message': 'Could not load oui_database.json'})
@bluetooth_bp.route('/interfaces')
def get_bt_interfaces():
"""Get available Bluetooth interfaces and tools."""
interfaces = detect_bt_interfaces()
tools = {
'hcitool': check_tool('hcitool'),
'bluetoothctl': check_tool('bluetoothctl'),
'hciconfig': check_tool('hciconfig'),
'l2ping': check_tool('l2ping'),
'sdptool': check_tool('sdptool')
}
return jsonify({
'interfaces': interfaces,
'tools': tools,
'current_interface': app_module.bt_interface
})
@bluetooth_bp.route('/scan/start', methods=['POST'])
def start_bt_scan():
"""Start Bluetooth scanning."""
with app_module.bt_lock:
if app_module.bt_process:
if app_module.bt_process.poll() is None:
return jsonify({'status': 'error', 'message': 'Scan already running'})
else:
app_module.bt_process = None
data = request.json
scan_mode = data.get('mode', 'hcitool')
interface = data.get('interface', 'hci0')
scan_ble = data.get('scan_ble', True)
app_module.bt_interface = interface
app_module.bt_devices = {}
while not app_module.bt_queue.empty():
try:
app_module.bt_queue.get_nowait()
except queue.Empty:
break
try:
if scan_mode == 'hcitool':
if scan_ble:
cmd = ['hcitool', '-i', interface, 'lescan', '--duplicates']
else:
cmd = ['hcitool', '-i', interface, 'scan']
app_module.bt_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
elif scan_mode == 'bluetoothctl':
master_fd, slave_fd = pty.openpty()
app_module.bt_process = subprocess.Popen(
['bluetoothctl'],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd)
app_module.bt_process._master_fd = master_fd
time.sleep(0.5)
os.write(master_fd, b'power on\n')
time.sleep(0.3)
os.write(master_fd, b'scan on\n')
else:
return jsonify({'status': 'error', 'message': f'Unknown scan mode: {scan_mode}'})
time.sleep(0.5)
if app_module.bt_process.poll() is not None:
stderr_output = app_module.bt_process.stderr.read().decode('utf-8', errors='replace').strip()
app_module.bt_process = None
return jsonify({'status': 'error', 'message': stderr_output or 'Process failed to start'})
thread = threading.Thread(target=stream_bt_scan, args=(app_module.bt_process, scan_mode))
thread.daemon = True
thread.start()
app_module.bt_queue.put({'type': 'info', 'text': f'Started {scan_mode} scan on {interface}'})
return jsonify({'status': 'started', 'mode': scan_mode, 'interface': interface})
except FileNotFoundError as e:
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@bluetooth_bp.route('/scan/stop', methods=['POST'])
def stop_bt_scan():
"""Stop Bluetooth scanning."""
with app_module.bt_lock:
if app_module.bt_process:
app_module.bt_process.terminate()
try:
app_module.bt_process.wait(timeout=3)
except subprocess.TimeoutExpired:
app_module.bt_process.kill()
app_module.bt_process = None
return jsonify({'status': 'stopped'})
return jsonify({'status': 'not_running'})
@bluetooth_bp.route('/reset', methods=['POST'])
def reset_bt_adapter():
"""Reset Bluetooth adapter."""
data = request.json
interface = data.get('interface', 'hci0')
with app_module.bt_lock:
if app_module.bt_process:
try:
app_module.bt_process.terminate()
app_module.bt_process.wait(timeout=2)
except (subprocess.TimeoutExpired, OSError):
try:
app_module.bt_process.kill()
except OSError:
pass
app_module.bt_process = None
try:
subprocess.run(['pkill', '-f', 'hcitool'], capture_output=True, timeout=2)
subprocess.run(['pkill', '-f', 'bluetoothctl'], capture_output=True, timeout=2)
time.sleep(0.5)
subprocess.run(['rfkill', 'unblock', 'bluetooth'], capture_output=True, timeout=5)
subprocess.run(['hciconfig', interface, 'down'], capture_output=True, timeout=5)
time.sleep(1)
subprocess.run(['hciconfig', interface, 'up'], capture_output=True, timeout=5)
time.sleep(0.5)
result = subprocess.run(['hciconfig', interface], capture_output=True, text=True, timeout=5)
is_up = 'UP RUNNING' in result.stdout
return jsonify({
'status': 'success' if is_up else 'warning',
'message': f'Adapter {interface} reset' if is_up else f'Reset attempted but adapter may still be down',
'is_up': is_up
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@bluetooth_bp.route('/enum', methods=['POST'])
def enum_bt_services():
"""Enumerate services on a Bluetooth device."""
data = request.json
target_mac = data.get('mac')
if not target_mac:
return jsonify({'status': 'error', 'message': 'Target MAC required'})
try:
result = subprocess.run(
['sdptool', 'browse', target_mac],
capture_output=True, text=True, timeout=30
)
services = []
current_service = {}
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('Service Name:'):
if current_service:
services.append(current_service)
current_service = {'name': line.split(':', 1)[1].strip()}
elif line.startswith('Service Description:'):
current_service['description'] = line.split(':', 1)[1].strip()
if current_service:
services.append(current_service)
app_module.bt_services[target_mac] = services
return jsonify({
'status': 'success',
'mac': target_mac,
'services': services
})
except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Connection timed out'})
except FileNotFoundError:
return jsonify({'status': 'error', 'message': 'sdptool not found'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
@bluetooth_bp.route('/devices')
def get_bt_devices():
"""Get current list of discovered Bluetooth devices."""
return jsonify({
'devices': list(app_module.bt_devices.values()),
'beacons': list(app_module.bt_beacons.values()),
'interface': app_module.bt_interface
})
@bluetooth_bp.route('/stream')
def stream_bt():
"""SSE stream for Bluetooth events."""
def generate():
while True:
try:
msg = app_module.bt_queue.get(timeout=1)
yield f"data: {json.dumps(msg)}\n\n"
except queue.Empty:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response