From ba02f761c6b60e5dd259a6ae0137426ac2350b1a Mon Sep 17 00:00:00 2001 From: James Smith Date: Mon, 11 May 2026 11:18:50 +0100 Subject: [PATCH] feat(meshcore): add async worker bridge (utils/meshcore_client.py) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements AsyncWorker — the daemon asyncio thread that owns the meshcore library connection, subscribes to all relevant EventTypes, and feeds events back into MeshcoreClient via on_message/on_node/on_telemetry/on_traceroute/ on_connected/on_error. Includes retry-with-backoff (3 attempts: 5s/15s/45s), thread-safe send_text/request_traceroute/scan_ble_sync for Flask callers, and a standalone _scan_ble() coroutine using bleak.BleakScanner. Co-Authored-By: Claude Sonnet 4.6 --- utils/meshcore_client.py | 289 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 utils/meshcore_client.py diff --git a/utils/meshcore_client.py b/utils/meshcore_client.py new file mode 100644 index 0000000..6179d17 --- /dev/null +++ b/utils/meshcore_client.py @@ -0,0 +1,289 @@ +"""Async worker that runs the meshcore library inside a daemon thread. + +Only this file touches the meshcore library directly. All other Intercept +code goes through MeshcoreClient in utils/meshcore.py. +""" + +from __future__ import annotations + +import asyncio +import threading +import uuid +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from utils.logging import get_logger + +if TYPE_CHECKING: + from utils.meshcore import ( + ConnectionConfig, + MeshcoreClient, + ) + +logger = get_logger("intercept.meshcore.worker") + +_RETRY_DELAYS = [5, 15, 45] + + +class AsyncWorker: + """Owns a daemon asyncio event loop; bridges meshcore events to MeshcoreClient.""" + + def __init__(self, config: ConnectionConfig, client: MeshcoreClient) -> None: + self._config = config + self._client = client + self._loop: asyncio.AbstractEventLoop | None = None + self._mc = None # MeshCore instance + self._thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._asyncio_stop: asyncio.Event | None = None # set in asyncio thread + + def start(self) -> None: + self._stop_event.clear() + self._loop = asyncio.new_event_loop() + self._thread = threading.Thread( + target=self._run, + daemon=True, + name="meshcore-asyncio", + ) + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._asyncio_stop and self._loop and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._asyncio_stop.set) + if self._thread: + self._thread.join(timeout=5) + + def _run(self) -> None: + asyncio.set_event_loop(self._loop) + try: + self._loop.run_until_complete(self._connect_with_retry()) + except Exception as exc: + logger.exception("Meshcore asyncio thread crashed: %s", exc) + finally: + self._loop.close() + + async def _connect_with_retry(self) -> None: + self._asyncio_stop = asyncio.Event() + for attempt, delay in enumerate(_RETRY_DELAYS + [None]): + if self._stop_event.is_set(): + return + try: + await self._do_connect() + return + except Exception as exc: + logger.warning("Meshcore connect attempt %d failed: %s", attempt + 1, exc) + if delay is None: + self._client.on_error(f"Connection failed after retries: {exc}") + return + # Wait for delay or early stop + try: + await asyncio.wait_for( + asyncio.shield(self._asyncio_stop.wait()), + timeout=delay, + ) + return # stop was signalled during delay + except asyncio.TimeoutError: + pass + + async def _do_connect(self) -> None: + from meshcore import EventType, MeshCore + + from utils.meshcore import BLEConfig, SerialConfig, TCPConfig + + cfg = self._config + + if isinstance(cfg, SerialConfig): + port = cfg.port or "/dev/ttyUSB0" + self._mc = await MeshCore.create_serial(port=port, baudrate=cfg.baud, debug=False) + transport, device = "serial", port + elif isinstance(cfg, TCPConfig): + self._mc = await MeshCore.create_tcp(host=cfg.host, port=cfg.port, debug=False) + transport, device = "tcp", f"{cfg.host}:{cfg.port}" + elif isinstance(cfg, BLEConfig): + self._mc = await MeshCore.create_ble(address=cfg.device_address, debug=False) + transport, device = "ble", cfg.device_address or "auto" + else: + raise RuntimeError(f"Unknown connection config type: {type(cfg)}") + + if self._mc is None: + raise RuntimeError("Failed to create MeshCore connection") + + # Subscribe to all relevant events + self._mc.subscribe(EventType.CONTACT_MSG_RECV, self._on_private_msg) + self._mc.subscribe(EventType.CHANNEL_MSG_RECV, self._on_channel_msg) + self._mc.subscribe(EventType.ADVERTISEMENT, self._on_advertisement) + self._mc.subscribe(EventType.STATS_CORE, self._on_stats_core) + self._mc.subscribe(EventType.TRACE_DATA, self._on_trace_data) + self._mc.subscribe(EventType.DISCONNECTED, self._on_disconnected) + + self._client.on_connected(transport=transport, device=device) + + # Fetch initial contacts + try: + await self._mc.commands.get_contacts(lastmod=0, timeout=5) + for pubkey, contact in self._mc.contacts.items(): + self._client.on_node(self._contact_to_node(contact)) + except Exception as exc: + logger.warning("Failed to fetch initial contacts: %s", exc) + + # Keep the loop alive until stop is signalled + await self._asyncio_stop.wait() + + if self._mc and self._mc.is_connected: + await self._mc.disconnect() + + # -- Event callbacks (called from asyncio event dispatcher) -- + + def _on_private_msg(self, event) -> None: + from utils.meshcore import MeshcoreMessage + + p = event.payload + msg = MeshcoreMessage( + id=str(uuid.uuid4()), + sender_id=str(p.get("pubkey_prefix", "unknown")), + recipient_id="DIRECT", + text=str(p.get("text", "")), + timestamp=datetime.now(timezone.utc), + hop_count=int(p.get("path_len", 0) or 0), + snr=None, + is_direct=True, + ) + self._client.on_message(msg) + + def _on_channel_msg(self, event) -> None: + from utils.meshcore import MeshcoreMessage + + p = event.payload + msg = MeshcoreMessage( + id=str(uuid.uuid4()), + sender_id="unknown", + recipient_id=f"CHAN{p.get('channel_idx', 0)}", + text=str(p.get("text", "")), + timestamp=datetime.now(timezone.utc), + hop_count=int(p.get("path_len", 0) or 0), + snr=None, + is_direct=False, + ) + self._client.on_message(msg) + + def _on_advertisement(self, event) -> None: + contact = event.payload + if not contact: + return + self._client.on_node(self._contact_to_node(contact)) + + def _on_stats_core(self, event) -> None: + from utils.meshcore import MeshcoreTelemetry + + p = event.payload + node_id = "self" # stats_core is always for the local node + battery_mv = p.get("battery_mv") + battery_pct = int(battery_mv / 42) if battery_mv else None # rough: 4200mv = 100% + t = MeshcoreTelemetry( + node_id=node_id, + timestamp=datetime.now(timezone.utc), + battery_pct=battery_pct, + voltage=battery_mv / 1000.0 if battery_mv else None, + temperature=None, + humidity=None, + uptime_secs=int(p.get("uptime_secs", 0) or 0), + ) + self._client.on_telemetry(t) + + def _on_trace_data(self, event) -> None: + from utils.meshcore import MeshcoreTraceroute + + p = event.payload or {} + # TRACE_DATA payload structure varies; extract what we can + hops = p.get("hops") or p.get("path") or [] + if isinstance(hops, str): + hops = hops.split(",") if hops else [] + snr_per_hop = p.get("snr_per_hop") or [] + tr = MeshcoreTraceroute( + origin_id=str(p.get("origin_id", "self")), + destination_id=str(p.get("destination_id", "unknown")), + hops=[str(h) for h in hops], + snr_per_hop=[float(s) for s in snr_per_hop if s is not None], + timestamp=datetime.now(timezone.utc), + ) + self._client.on_traceroute(tr) + + def _on_disconnected(self, event) -> None: + if not self._stop_event.is_set(): + self._client.on_error("Device disconnected unexpectedly") + if self._asyncio_stop: + self._asyncio_stop.set() + + # -- Helpers -- + + def _contact_to_node(self, contact: dict): + from utils.meshcore import MeshcoreNode, _is_repeater_contact + + lat = contact.get("adv_lat") + lon = contact.get("adv_lon") + return MeshcoreNode( + node_id=str(contact.get("public_key", "")), + name=str(contact.get("adv_name", "Unknown")), + is_repeater=_is_repeater_contact(contact), + lat=float(lat) if lat else None, + lon=float(lon) if lon else None, + battery_pct=None, + last_seen=datetime.now(timezone.utc), + snr=None, + hops_away=int(contact.get("out_path_len", -1)), + ) + + # -- Commands (called from Flask thread) -- + + def _submit(self, coro) -> None: + if self._loop and self._loop.is_running(): + asyncio.run_coroutine_threadsafe(coro, self._loop) + + def send_text(self, recipient_id: str, text: str) -> None: + async def _send(): + if self._mc: + await self._mc.commands.send_msg(recipient_id, text) + + self._submit(_send()) + + def request_traceroute(self, node_id: str) -> None: + async def _trace(): + if self._mc: + await self._mc.commands.send_trace(auth_code=0) + + self._submit(_trace()) + + def scan_ble_sync(self) -> list[dict]: + if not self._loop or not self._loop.is_running(): + # Start a one-shot loop for the scan + return asyncio.run(_scan_ble()) + future = asyncio.run_coroutine_threadsafe(_scan_ble(), self._loop) + try: + return future.result(timeout=10) + except Exception as exc: + logger.warning("BLE scan failed: %s", exc) + return [] + + +async def _scan_ble() -> list[dict]: + """Scan for MeshCore BLE devices using bleak directly.""" + try: + from bleak import BleakScanner + + devices = await BleakScanner.discover(timeout=5.0) + return [ + { + "address": d.address, + "name": d.name or "Unknown", + "rssi": getattr(d, "rssi", None), + } + for d in devices + if d.name and d.name.startswith("MeshCore") + ] + except ImportError: + logger.warning("bleak not installed; BLE scan unavailable. Run: pip install bleak") + return [] + except Exception as exc: + logger.warning("BLE scan failed: %s", exc) + return []