Files
intercept/utils/meshcore_client.py
James Smith 77cdd56641 debug(meshcore): enable BLE debug mode and disconnect before connect
- Enable debug=True on MeshCore.create_ble() to surface verbose logs
- Disconnect any existing BlueZ connection before bleak connects to
  avoid conflicts from prior bluetoothctl/pairing sessions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 21:15:24 +01:00

320 lines
12 KiB
Python

"""Async worker that runs the meshcore library inside a daemon thread.
Only this file touches the meshcore library directly. All other Intercept
code goes through MeshcoreClient in utils/meshcore.py.
"""
from __future__ import annotations
import asyncio
import threading
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from utils.logging import get_logger
if TYPE_CHECKING:
from utils.meshcore import (
ConnectionConfig,
MeshcoreClient,
)
logger = get_logger("intercept.meshcore.worker")
_RETRY_DELAYS = [5, 15, 45]
class AsyncWorker:
"""Owns a daemon asyncio event loop; bridges meshcore events to MeshcoreClient."""
def __init__(self, config: ConnectionConfig, client: MeshcoreClient) -> None:
self._config = config
self._client = client
self._loop: asyncio.AbstractEventLoop | None = None
self._mc = None # MeshCore instance
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._asyncio_stop: asyncio.Event | None = None # set in asyncio thread
def start(self) -> None:
self._stop_event.clear()
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run,
daemon=True,
name="meshcore-asyncio",
)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._asyncio_stop and self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._asyncio_stop.set)
if self._thread:
self._thread.join(timeout=5)
def _run(self) -> None:
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._connect_with_retry())
except Exception as exc:
logger.exception("Meshcore asyncio thread crashed: %s", exc)
finally:
self._loop.close()
async def _wait_or_stop(self, seconds: float) -> bool:
"""Wait for seconds; return True if stop was signalled early."""
stop_task = asyncio.ensure_future(self._asyncio_stop.wait())
sleep_task = asyncio.ensure_future(asyncio.sleep(seconds))
done, pending = await asyncio.wait(
[stop_task, sleep_task],
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
return stop_task in done
async def _connect_with_retry(self) -> None:
self._asyncio_stop = asyncio.Event()
for attempt, delay in enumerate(_RETRY_DELAYS + [None]):
if self._stop_event.is_set():
return
try:
await self._do_connect()
return
except Exception as exc:
logger.warning("Meshcore connect attempt %d failed: %s", attempt + 1, exc)
if delay is None:
self._client.on_error(f"Connection failed after retries: {exc}")
return
# Wait for delay or early stop
if await self._wait_or_stop(delay):
return # stop signalled
async def _do_connect(self) -> None:
from meshcore import EventType, MeshCore
from utils.meshcore import BLEConfig, SerialConfig, TCPConfig
cfg = self._config
if isinstance(cfg, SerialConfig):
port = cfg.port or "/dev/ttyUSB0"
self._mc = await MeshCore.create_serial(port=port, baudrate=cfg.baud, debug=False)
transport, device = "serial", port
elif isinstance(cfg, TCPConfig):
self._mc = await MeshCore.create_tcp(host=cfg.host, port=cfg.port, debug=False)
transport, device = "tcp", f"{cfg.host}:{cfg.port}"
elif isinstance(cfg, BLEConfig):
# Disconnect any existing BlueZ connection so bleak gets a clean slate
if cfg.device_address:
try:
import subprocess
subprocess.run(
["bluetoothctl", "disconnect", cfg.device_address],
timeout=3,
capture_output=True,
)
await asyncio.sleep(1.0)
except Exception:
pass
self._mc = await MeshCore.create_ble(address=cfg.device_address, debug=True)
transport, device = "ble", cfg.device_address or "auto"
else:
raise RuntimeError(f"Unknown connection config type: {type(cfg)}")
if self._mc is None:
raise RuntimeError("Failed to create MeshCore connection")
# Subscribe to all relevant events
self._mc.subscribe(EventType.CONTACT_MSG_RECV, self._on_private_msg)
self._mc.subscribe(EventType.CHANNEL_MSG_RECV, self._on_channel_msg)
self._mc.subscribe(EventType.ADVERTISEMENT, self._on_advertisement)
self._mc.subscribe(EventType.STATS_CORE, self._on_stats_core)
self._mc.subscribe(EventType.TRACE_DATA, self._on_trace_data)
self._mc.subscribe(EventType.DISCONNECTED, self._on_disconnected)
self._client.on_connected(transport=transport, device=device)
# Fetch initial contacts
try:
await self._mc.commands.get_contacts(lastmod=0, timeout=5)
for pubkey, contact in self._mc.contacts.items():
self._client.on_node(self._contact_to_node(contact))
except Exception as exc:
logger.warning("Failed to fetch initial contacts: %s", exc)
# Keep the loop alive until stop is signalled.
# Poll _stop_event every second so stop() is honoured even if
# _asyncio_stop.set() was missed due to a startup race.
while not self._stop_event.is_set():
try:
await asyncio.wait_for(self._asyncio_stop.wait(), timeout=1.0)
break
except asyncio.TimeoutError:
continue
if self._mc and self._mc.is_connected:
await self._mc.disconnect()
# -- Event callbacks (called from asyncio event dispatcher) --
def _on_private_msg(self, event) -> None:
from utils.meshcore import MeshcoreMessage
p = event.payload
msg = MeshcoreMessage(
id=str(uuid.uuid4()),
sender_id=str(p.get("pubkey_prefix", "unknown")),
recipient_id="DIRECT",
text=str(p.get("text", "")),
timestamp=datetime.now(timezone.utc),
hop_count=int(p.get("path_len", 0) or 0),
snr=None,
is_direct=True,
)
self._client.on_message(msg)
def _on_channel_msg(self, event) -> None:
from utils.meshcore import MeshcoreMessage
p = event.payload
msg = MeshcoreMessage(
id=str(uuid.uuid4()),
sender_id=str(p.get("pubkey_prefix") or p.get("sender_id", "unknown")),
recipient_id=f"CHAN{p.get('channel_idx', 0)}",
text=str(p.get("text", "")),
timestamp=datetime.now(timezone.utc),
hop_count=int(p.get("path_len", 0) or 0),
snr=None,
is_direct=False,
)
self._client.on_message(msg)
def _on_advertisement(self, event) -> None:
contact = event.payload
if not contact:
return
self._client.on_node(self._contact_to_node(contact))
def _on_stats_core(self, event) -> None:
from utils.meshcore import MeshcoreTelemetry
p = event.payload
node_id = "self" # stats_core is always for the local node
battery_mv = p.get("battery_mv")
battery_pct = min(int(battery_mv / 42), 100) if battery_mv else None # rough: 4200mv = 100%
t = MeshcoreTelemetry(
node_id=node_id,
timestamp=datetime.now(timezone.utc),
battery_pct=battery_pct,
voltage=battery_mv / 1000.0 if battery_mv else None,
temperature=None,
humidity=None,
uptime_secs=int(p.get("uptime_secs", 0) or 0),
)
self._client.on_telemetry(t)
def _on_trace_data(self, event) -> None:
from utils.meshcore import MeshcoreTraceroute
p = event.payload or {}
# TRACE_DATA payload structure varies; extract what we can
hops = p.get("hops") or p.get("path") or []
if isinstance(hops, str):
hops = hops.split(",") if hops else []
snr_per_hop = p.get("snr_per_hop") or []
tr = MeshcoreTraceroute(
origin_id=str(p.get("origin_id", "self")),
destination_id=str(p.get("destination_id", "unknown")),
hops=[str(h) for h in hops],
snr_per_hop=[float(s) for s in snr_per_hop if s is not None],
timestamp=datetime.now(timezone.utc),
)
self._client.on_traceroute(tr)
def _on_disconnected(self, event) -> None:
if not self._stop_event.is_set():
self._client.on_error("Device disconnected unexpectedly")
if self._asyncio_stop:
self._asyncio_stop.set()
# -- Helpers --
def _contact_to_node(self, contact: dict):
from utils.meshcore import MeshcoreNode, _is_repeater_contact
lat = contact.get("adv_lat")
lon = contact.get("adv_lon")
return MeshcoreNode(
node_id=str(contact.get("public_key", "")),
name=str(contact.get("adv_name", "Unknown")),
is_repeater=_is_repeater_contact(contact),
lat=float(lat) if lat else None,
lon=float(lon) if lon else None,
battery_pct=None,
last_seen=datetime.now(timezone.utc),
snr=None,
hops_away=int(contact.get("out_path_len", -1)),
)
# -- Commands (called from Flask thread) --
def _submit(self, coro) -> None:
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(coro, self._loop)
else:
coro.close()
logger.debug("Command dropped: worker not running")
def send_text(self, recipient_id: str, text: str) -> None:
async def _send():
if self._mc:
await self._mc.commands.send_msg(recipient_id, text)
self._submit(_send())
def request_traceroute(self, node_id: str) -> None:
async def _trace():
if self._mc:
logger.debug("Requesting traceroute (target hint: %s)", node_id)
await self._mc.commands.send_trace(auth_code=0)
self._submit(_trace())
def scan_ble_sync(self) -> list[dict]:
if not self._loop or not self._loop.is_running():
# Start a one-shot loop for the scan
return asyncio.run(_scan_ble())
future = asyncio.run_coroutine_threadsafe(_scan_ble(), self._loop)
try:
return future.result(timeout=10)
except Exception as exc:
logger.warning("BLE scan failed: %s", exc)
return []
async def _scan_ble() -> list[dict]:
"""Scan for MeshCore BLE devices using bleak directly."""
try:
from bleak import BleakScanner
devices = await BleakScanner.discover(timeout=5.0)
return [
{
"address": d.address,
"name": d.name or "Unknown",
"rssi": getattr(d, "rssi", None),
}
for d in devices
if d.name and d.name.startswith("MeshCore")
]
except ImportError:
logger.warning("bleak not installed; BLE scan unavailable. Run: pip install bleak")
return []
except Exception as exc:
logger.warning("BLE scan failed: %s", exc)
return []