diff --git a/utils/meshcore_client.py b/utils/meshcore_client.py index 6179d17..3cf0a64 100644 --- a/utils/meshcore_client.py +++ b/utils/meshcore_client.py @@ -63,6 +63,18 @@ class AsyncWorker: finally: self._loop.close() + async def _wait_or_stop(self, seconds: float) -> bool: + """Wait for seconds; return True if stop was signalled early.""" + stop_task = asyncio.ensure_future(self._asyncio_stop.wait()) + sleep_task = asyncio.ensure_future(asyncio.sleep(seconds)) + done, pending = await asyncio.wait( + [stop_task, sleep_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for t in pending: + t.cancel() + return stop_task in done + async def _connect_with_retry(self) -> None: self._asyncio_stop = asyncio.Event() for attempt, delay in enumerate(_RETRY_DELAYS + [None]): @@ -77,14 +89,8 @@ class AsyncWorker: self._client.on_error(f"Connection failed after retries: {exc}") return # Wait for delay or early stop - try: - await asyncio.wait_for( - asyncio.shield(self._asyncio_stop.wait()), - timeout=delay, - ) - return # stop was signalled during delay - except asyncio.TimeoutError: - pass + if await self._wait_or_stop(delay): + return # stop signalled async def _do_connect(self) -> None: from meshcore import EventType, MeshCore @@ -127,8 +133,15 @@ class AsyncWorker: except Exception as exc: logger.warning("Failed to fetch initial contacts: %s", exc) - # Keep the loop alive until stop is signalled - await self._asyncio_stop.wait() + # Keep the loop alive until stop is signalled. + # Poll _stop_event every second so stop() is honoured even if + # _asyncio_stop.set() was missed due to a startup race. + while not self._stop_event.is_set(): + try: + await asyncio.wait_for(self._asyncio_stop.wait(), timeout=1.0) + break + except asyncio.TimeoutError: + continue if self._mc and self._mc.is_connected: await self._mc.disconnect() @@ -157,7 +170,7 @@ class AsyncWorker: p = event.payload msg = MeshcoreMessage( id=str(uuid.uuid4()), - sender_id="unknown", + sender_id=str(p.get("pubkey_prefix") or p.get("sender_id", "unknown")), recipient_id=f"CHAN{p.get('channel_idx', 0)}", text=str(p.get("text", "")), timestamp=datetime.now(timezone.utc), @@ -179,7 +192,7 @@ class AsyncWorker: p = event.payload node_id = "self" # stats_core is always for the local node battery_mv = p.get("battery_mv") - battery_pct = int(battery_mv / 42) if battery_mv else None # rough: 4200mv = 100% + battery_pct = min(int(battery_mv / 42), 100) if battery_mv else None # rough: 4200mv = 100% t = MeshcoreTelemetry( node_id=node_id, timestamp=datetime.now(timezone.utc), @@ -239,6 +252,9 @@ class AsyncWorker: def _submit(self, coro) -> None: if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(coro, self._loop) + else: + coro.close() + logger.debug("Command dropped: worker not running") def send_text(self, recipient_id: str, text: str) -> None: async def _send(): @@ -250,6 +266,7 @@ class AsyncWorker: def request_traceroute(self, node_id: str) -> None: async def _trace(): if self._mc: + logger.debug("Requesting traceroute (target hint: %s)", node_id) await self._mc.commands.send_trace(auth_code=0) self._submit(_trace())