mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Fix Bluetooth interface status detection and bluetoothctl scanning
- Fix hciconfig parsing to check for "UP RUNNING" in the full interface block, not just the first line - Use pty for bluetoothctl to get proper output (interactive tools need a pseudo-terminal) - Strip ANSI escape codes from bluetoothctl output - Properly read from pty master_fd with select() for non-blocking I/O 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
140
intercept.py
140
intercept.py
@@ -5553,17 +5553,25 @@ def detect_bt_interfaces():
|
||||
try:
|
||||
# Use hciconfig to list interfaces
|
||||
result = subprocess.run(['hciconfig'], capture_output=True, text=True, timeout=5)
|
||||
lines = result.stdout.split('\n')
|
||||
current_iface = None
|
||||
for line in lines:
|
||||
if line and not line.startswith('\t') and not line.startswith(' '):
|
||||
parts = line.split(':')
|
||||
if parts:
|
||||
current_iface = parts[0].strip()
|
||||
output = result.stdout
|
||||
|
||||
# Parse hciconfig output - "UP RUNNING" appears on a separate line
|
||||
import re
|
||||
# Split by interface blocks
|
||||
blocks = re.split(r'(?=^hci\d+:)', output, flags=re.MULTILINE)
|
||||
for block in blocks:
|
||||
if block.strip():
|
||||
# Get interface name from first line
|
||||
first_line = block.split('\n')[0]
|
||||
match = re.match(r'(hci\d+):', first_line)
|
||||
if match:
|
||||
iface_name = match.group(1)
|
||||
# Check if UP appears anywhere in the block
|
||||
is_up = 'UP RUNNING' in block or '\tUP ' in block
|
||||
interfaces.append({
|
||||
'name': current_iface,
|
||||
'name': iface_name,
|
||||
'type': 'hci',
|
||||
'status': 'up' if 'UP' in line else 'down'
|
||||
'status': 'up' if is_up else 'down'
|
||||
})
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
@@ -5681,39 +5689,72 @@ def stream_bt_scan(process, scan_mode):
|
||||
})
|
||||
|
||||
elif scan_mode == 'bluetoothctl':
|
||||
# bluetoothctl scan output
|
||||
for line in iter(process.stdout.readline, b''):
|
||||
line = line.decode('utf-8', errors='replace').strip()
|
||||
# bluetoothctl scan output - read from pty
|
||||
import os
|
||||
import select
|
||||
|
||||
# Parse [NEW] Device or [CHG] Device lines
|
||||
if 'Device' in line and ':' in line:
|
||||
import re
|
||||
match = re.search(r'([0-9A-Fa-f:]{17})\s*(.*)', line)
|
||||
if match:
|
||||
mac = match.group(1)
|
||||
name = match.group(2).strip()
|
||||
master_fd = getattr(process, '_master_fd', None)
|
||||
if not master_fd:
|
||||
bt_queue.put({'type': 'error', 'text': 'bluetoothctl pty not available'})
|
||||
return
|
||||
|
||||
device = {
|
||||
'mac': mac,
|
||||
'name': name or '[Unknown]',
|
||||
'manufacturer': get_manufacturer(mac),
|
||||
'type': classify_bt_device(name, None, None),
|
||||
'rssi': None,
|
||||
'last_seen': time.time()
|
||||
}
|
||||
buffer = ''
|
||||
while process.poll() is None:
|
||||
# Check if data available
|
||||
readable, _, _ = select.select([master_fd], [], [], 1.0)
|
||||
if readable:
|
||||
try:
|
||||
data = os.read(master_fd, 4096)
|
||||
if not data:
|
||||
break
|
||||
buffer += data.decode('utf-8', errors='replace')
|
||||
|
||||
tracker = detect_tracker(mac, name)
|
||||
if tracker:
|
||||
device['tracker'] = tracker
|
||||
# Process complete lines
|
||||
while '\n' in buffer:
|
||||
line, buffer = buffer.split('\n', 1)
|
||||
line = line.strip()
|
||||
|
||||
is_new = mac not in bt_devices
|
||||
bt_devices[mac] = device
|
||||
# Remove ANSI escape codes
|
||||
import re
|
||||
line = re.sub(r'\x1b\[[0-9;]*m', '', line)
|
||||
line = re.sub(r'\x1b\[\?.*?[a-zA-Z]', '', line)
|
||||
|
||||
bt_queue.put({
|
||||
'type': 'device',
|
||||
'action': 'new' if is_new else 'update',
|
||||
**device
|
||||
})
|
||||
# Parse [NEW] Device or [CHG] Device lines
|
||||
if 'Device' in line and ':' in line:
|
||||
match = re.search(r'([0-9A-Fa-f:]{17})\s*(.*)', line)
|
||||
if match:
|
||||
mac = match.group(1)
|
||||
name = match.group(2).strip()
|
||||
|
||||
device = {
|
||||
'mac': mac,
|
||||
'name': name or '[Unknown]',
|
||||
'manufacturer': get_manufacturer(mac),
|
||||
'type': classify_bt_device(name, None, None),
|
||||
'rssi': None,
|
||||
'last_seen': time.time()
|
||||
}
|
||||
|
||||
tracker = detect_tracker(mac, name)
|
||||
if tracker:
|
||||
device['tracker'] = tracker
|
||||
|
||||
is_new = mac not in bt_devices
|
||||
bt_devices[mac] = device
|
||||
|
||||
bt_queue.put({
|
||||
'type': 'device',
|
||||
'action': 'new' if is_new else 'update',
|
||||
**device
|
||||
})
|
||||
except OSError:
|
||||
break
|
||||
|
||||
# Close master_fd
|
||||
try:
|
||||
os.close(master_fd)
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
bt_queue.put({'type': 'error', 'text': str(e)})
|
||||
@@ -5785,17 +5826,26 @@ def start_bt_scan():
|
||||
)
|
||||
|
||||
elif scan_mode == 'bluetoothctl':
|
||||
# Use bluetoothctl for scanning
|
||||
cmd = ['bluetoothctl']
|
||||
# Use bluetoothctl for scanning with stdbuf to unbuffer output
|
||||
# Or use script command to provide a pty
|
||||
import pty
|
||||
import os
|
||||
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
bt_process = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
['bluetoothctl'],
|
||||
stdin=slave_fd,
|
||||
stdout=slave_fd,
|
||||
stderr=slave_fd,
|
||||
close_fds=True
|
||||
)
|
||||
os.close(slave_fd)
|
||||
|
||||
# Store master_fd for reading
|
||||
bt_process._master_fd = master_fd
|
||||
|
||||
# Send scan on command
|
||||
bt_process.stdin.write(b'scan on\n')
|
||||
bt_process.stdin.flush()
|
||||
os.write(master_fd, b'scan on\n')
|
||||
|
||||
elif scan_mode == 'ubertooth':
|
||||
cmd = ['ubertooth-scan', '-t', str(duration)]
|
||||
|
||||
Reference in New Issue
Block a user