diff --git a/intercept.py b/intercept.py index 4c4195e..fa40b50 100755 --- a/intercept.py +++ b/intercept.py @@ -5553,17 +5553,25 @@ def detect_bt_interfaces(): try: # Use hciconfig to list interfaces result = subprocess.run(['hciconfig'], capture_output=True, text=True, timeout=5) - lines = result.stdout.split('\n') - current_iface = None - for line in lines: - if line and not line.startswith('\t') and not line.startswith(' '): - parts = line.split(':') - if parts: - current_iface = parts[0].strip() + output = result.stdout + + # Parse hciconfig output - "UP RUNNING" appears on a separate line + import re + # Split by interface blocks + blocks = re.split(r'(?=^hci\d+:)', output, flags=re.MULTILINE) + for block in blocks: + if block.strip(): + # Get interface name from first line + first_line = block.split('\n')[0] + match = re.match(r'(hci\d+):', first_line) + if match: + iface_name = match.group(1) + # Check if UP appears anywhere in the block + is_up = 'UP RUNNING' in block or '\tUP ' in block interfaces.append({ - 'name': current_iface, + 'name': iface_name, 'type': 'hci', - 'status': 'up' if 'UP' in line else 'down' + 'status': 'up' if is_up else 'down' }) except FileNotFoundError: pass @@ -5681,39 +5689,72 @@ def stream_bt_scan(process, scan_mode): }) elif scan_mode == 'bluetoothctl': - # bluetoothctl scan output - for line in iter(process.stdout.readline, b''): - line = line.decode('utf-8', errors='replace').strip() + # bluetoothctl scan output - read from pty + import os + import select - # Parse [NEW] Device or [CHG] Device lines - if 'Device' in line and ':' in line: - import re - match = re.search(r'([0-9A-Fa-f:]{17})\s*(.*)', line) - if match: - mac = match.group(1) - name = match.group(2).strip() + master_fd = getattr(process, '_master_fd', None) + if not master_fd: + bt_queue.put({'type': 'error', 'text': 'bluetoothctl pty not available'}) + return - device = { - 'mac': mac, - 'name': name or '[Unknown]', - 'manufacturer': get_manufacturer(mac), - 'type': classify_bt_device(name, None, None), - 'rssi': None, - 'last_seen': time.time() - } + buffer = '' + while process.poll() is None: + # Check if data available + readable, _, _ = select.select([master_fd], [], [], 1.0) + if readable: + try: + data = os.read(master_fd, 4096) + if not data: + break + buffer += data.decode('utf-8', errors='replace') - tracker = detect_tracker(mac, name) - if tracker: - device['tracker'] = tracker + # Process complete lines + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() - is_new = mac not in bt_devices - bt_devices[mac] = device + # Remove ANSI escape codes + import re + line = re.sub(r'\x1b\[[0-9;]*m', '', line) + line = re.sub(r'\x1b\[\?.*?[a-zA-Z]', '', line) - bt_queue.put({ - 'type': 'device', - 'action': 'new' if is_new else 'update', - **device - }) + # Parse [NEW] Device or [CHG] Device lines + if 'Device' in line and ':' in line: + match = re.search(r'([0-9A-Fa-f:]{17})\s*(.*)', line) + if match: + mac = match.group(1) + name = match.group(2).strip() + + device = { + 'mac': mac, + 'name': name or '[Unknown]', + 'manufacturer': get_manufacturer(mac), + 'type': classify_bt_device(name, None, None), + 'rssi': None, + 'last_seen': time.time() + } + + tracker = detect_tracker(mac, name) + if tracker: + device['tracker'] = tracker + + is_new = mac not in bt_devices + bt_devices[mac] = device + + bt_queue.put({ + 'type': 'device', + 'action': 'new' if is_new else 'update', + **device + }) + except OSError: + break + + # Close master_fd + try: + os.close(master_fd) + except: + pass except Exception as e: bt_queue.put({'type': 'error', 'text': str(e)}) @@ -5785,17 +5826,26 @@ def start_bt_scan(): ) elif scan_mode == 'bluetoothctl': - # Use bluetoothctl for scanning - cmd = ['bluetoothctl'] + # Use bluetoothctl for scanning with stdbuf to unbuffer output + # Or use script command to provide a pty + import pty + import os + + master_fd, slave_fd = pty.openpty() bt_process = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE + ['bluetoothctl'], + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True ) + os.close(slave_fd) + + # Store master_fd for reading + bt_process._master_fd = master_fd + # Send scan on command - bt_process.stdin.write(b'scan on\n') - bt_process.stdin.flush() + os.write(master_fd, b'scan on\n') elif scan_mode == 'ubertooth': cmd = ['ubertooth-scan', '-t', str(duration)]