fix(meshcore): fix traceroute logging, asyncio task leaks, stop race, channel sender, and battery cap

- Log node_id hint in request_traceroute instead of silently dropping it
- Replace asyncio.shield/wait_for pattern with _wait_or_stop() to prevent orphan tasks on retry delays
- Poll _stop_event every 1s in _do_connect keep-alive loop to handle stop() race before _asyncio_stop is set
- Extract pubkey_prefix/sender_id in _on_channel_msg instead of hardcoding "unknown"
- Close coroutine and log in _submit() when worker is not running to prevent ResourceWarning
- Cap battery_pct at 100 to prevent values exceeding 100%

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-05-11 12:33:31 +01:00
parent ba02f761c6
commit 71eaf1d22a
+29 -12
View File
@@ -63,6 +63,18 @@ class AsyncWorker:
finally:
self._loop.close()
async def _wait_or_stop(self, seconds: float) -> bool:
"""Wait for seconds; return True if stop was signalled early."""
stop_task = asyncio.ensure_future(self._asyncio_stop.wait())
sleep_task = asyncio.ensure_future(asyncio.sleep(seconds))
done, pending = await asyncio.wait(
[stop_task, sleep_task],
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
return stop_task in done
async def _connect_with_retry(self) -> None:
self._asyncio_stop = asyncio.Event()
for attempt, delay in enumerate(_RETRY_DELAYS + [None]):
@@ -77,14 +89,8 @@ class AsyncWorker:
self._client.on_error(f"Connection failed after retries: {exc}")
return
# Wait for delay or early stop
try:
await asyncio.wait_for(
asyncio.shield(self._asyncio_stop.wait()),
timeout=delay,
)
return # stop was signalled during delay
except asyncio.TimeoutError:
pass
if await self._wait_or_stop(delay):
return # stop signalled
async def _do_connect(self) -> None:
from meshcore import EventType, MeshCore
@@ -127,8 +133,15 @@ class AsyncWorker:
except Exception as exc:
logger.warning("Failed to fetch initial contacts: %s", exc)
# Keep the loop alive until stop is signalled
await self._asyncio_stop.wait()
# Keep the loop alive until stop is signalled.
# Poll _stop_event every second so stop() is honoured even if
# _asyncio_stop.set() was missed due to a startup race.
while not self._stop_event.is_set():
try:
await asyncio.wait_for(self._asyncio_stop.wait(), timeout=1.0)
break
except asyncio.TimeoutError:
continue
if self._mc and self._mc.is_connected:
await self._mc.disconnect()
@@ -157,7 +170,7 @@ class AsyncWorker:
p = event.payload
msg = MeshcoreMessage(
id=str(uuid.uuid4()),
sender_id="unknown",
sender_id=str(p.get("pubkey_prefix") or p.get("sender_id", "unknown")),
recipient_id=f"CHAN{p.get('channel_idx', 0)}",
text=str(p.get("text", "")),
timestamp=datetime.now(timezone.utc),
@@ -179,7 +192,7 @@ class AsyncWorker:
p = event.payload
node_id = "self" # stats_core is always for the local node
battery_mv = p.get("battery_mv")
battery_pct = int(battery_mv / 42) if battery_mv else None # rough: 4200mv = 100%
battery_pct = min(int(battery_mv / 42), 100) if battery_mv else None # rough: 4200mv = 100%
t = MeshcoreTelemetry(
node_id=node_id,
timestamp=datetime.now(timezone.utc),
@@ -239,6 +252,9 @@ class AsyncWorker:
def _submit(self, coro) -> None:
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(coro, self._loop)
else:
coro.close()
logger.debug("Command dropped: worker not running")
def send_text(self, recipient_id: str, text: str) -> None:
async def _send():
@@ -250,6 +266,7 @@ class AsyncWorker:
def request_traceroute(self, node_id: str) -> None:
async def _trace():
if self._mc:
logger.debug("Requesting traceroute (target hint: %s)", node_id)
await self._mc.commands.send_trace(auth_code=0)
self._submit(_trace())