mirror of
https://github.com/smittix/intercept.git
synced 2026-06-09 06:31:55 -07:00
77cdd56641
- Enable debug=True on MeshCore.create_ble() to surface verbose logs - Disconnect any existing BlueZ connection before bleak connects to avoid conflicts from prior bluetoothctl/pairing sessions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
320 lines
12 KiB
Python
320 lines
12 KiB
Python
"""Async worker that runs the meshcore library inside a daemon thread.
|
|
|
|
Only this file touches the meshcore library directly. All other Intercept
|
|
code goes through MeshcoreClient in utils/meshcore.py.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import threading
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import TYPE_CHECKING
|
|
|
|
from utils.logging import get_logger
|
|
|
|
if TYPE_CHECKING:
|
|
from utils.meshcore import (
|
|
ConnectionConfig,
|
|
MeshcoreClient,
|
|
)
|
|
|
|
logger = get_logger("intercept.meshcore.worker")
|
|
|
|
_RETRY_DELAYS = [5, 15, 45]
|
|
|
|
|
|
class AsyncWorker:
|
|
"""Owns a daemon asyncio event loop; bridges meshcore events to MeshcoreClient."""
|
|
|
|
def __init__(self, config: ConnectionConfig, client: MeshcoreClient) -> None:
|
|
self._config = config
|
|
self._client = client
|
|
self._loop: asyncio.AbstractEventLoop | None = None
|
|
self._mc = None # MeshCore instance
|
|
self._thread: threading.Thread | None = None
|
|
self._stop_event = threading.Event()
|
|
self._asyncio_stop: asyncio.Event | None = None # set in asyncio thread
|
|
|
|
def start(self) -> None:
|
|
self._stop_event.clear()
|
|
self._loop = asyncio.new_event_loop()
|
|
self._thread = threading.Thread(
|
|
target=self._run,
|
|
daemon=True,
|
|
name="meshcore-asyncio",
|
|
)
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self._stop_event.set()
|
|
if self._asyncio_stop and self._loop and self._loop.is_running():
|
|
self._loop.call_soon_threadsafe(self._asyncio_stop.set)
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
|
|
def _run(self) -> None:
|
|
asyncio.set_event_loop(self._loop)
|
|
try:
|
|
self._loop.run_until_complete(self._connect_with_retry())
|
|
except Exception as exc:
|
|
logger.exception("Meshcore asyncio thread crashed: %s", exc)
|
|
finally:
|
|
self._loop.close()
|
|
|
|
async def _wait_or_stop(self, seconds: float) -> bool:
|
|
"""Wait for seconds; return True if stop was signalled early."""
|
|
stop_task = asyncio.ensure_future(self._asyncio_stop.wait())
|
|
sleep_task = asyncio.ensure_future(asyncio.sleep(seconds))
|
|
done, pending = await asyncio.wait(
|
|
[stop_task, sleep_task],
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
for t in pending:
|
|
t.cancel()
|
|
return stop_task in done
|
|
|
|
async def _connect_with_retry(self) -> None:
|
|
self._asyncio_stop = asyncio.Event()
|
|
for attempt, delay in enumerate(_RETRY_DELAYS + [None]):
|
|
if self._stop_event.is_set():
|
|
return
|
|
try:
|
|
await self._do_connect()
|
|
return
|
|
except Exception as exc:
|
|
logger.warning("Meshcore connect attempt %d failed: %s", attempt + 1, exc)
|
|
if delay is None:
|
|
self._client.on_error(f"Connection failed after retries: {exc}")
|
|
return
|
|
# Wait for delay or early stop
|
|
if await self._wait_or_stop(delay):
|
|
return # stop signalled
|
|
|
|
async def _do_connect(self) -> None:
|
|
from meshcore import EventType, MeshCore
|
|
|
|
from utils.meshcore import BLEConfig, SerialConfig, TCPConfig
|
|
|
|
cfg = self._config
|
|
|
|
if isinstance(cfg, SerialConfig):
|
|
port = cfg.port or "/dev/ttyUSB0"
|
|
self._mc = await MeshCore.create_serial(port=port, baudrate=cfg.baud, debug=False)
|
|
transport, device = "serial", port
|
|
elif isinstance(cfg, TCPConfig):
|
|
self._mc = await MeshCore.create_tcp(host=cfg.host, port=cfg.port, debug=False)
|
|
transport, device = "tcp", f"{cfg.host}:{cfg.port}"
|
|
elif isinstance(cfg, BLEConfig):
|
|
# Disconnect any existing BlueZ connection so bleak gets a clean slate
|
|
if cfg.device_address:
|
|
try:
|
|
import subprocess
|
|
|
|
subprocess.run(
|
|
["bluetoothctl", "disconnect", cfg.device_address],
|
|
timeout=3,
|
|
capture_output=True,
|
|
)
|
|
await asyncio.sleep(1.0)
|
|
except Exception:
|
|
pass
|
|
self._mc = await MeshCore.create_ble(address=cfg.device_address, debug=True)
|
|
transport, device = "ble", cfg.device_address or "auto"
|
|
else:
|
|
raise RuntimeError(f"Unknown connection config type: {type(cfg)}")
|
|
|
|
if self._mc is None:
|
|
raise RuntimeError("Failed to create MeshCore connection")
|
|
|
|
# Subscribe to all relevant events
|
|
self._mc.subscribe(EventType.CONTACT_MSG_RECV, self._on_private_msg)
|
|
self._mc.subscribe(EventType.CHANNEL_MSG_RECV, self._on_channel_msg)
|
|
self._mc.subscribe(EventType.ADVERTISEMENT, self._on_advertisement)
|
|
self._mc.subscribe(EventType.STATS_CORE, self._on_stats_core)
|
|
self._mc.subscribe(EventType.TRACE_DATA, self._on_trace_data)
|
|
self._mc.subscribe(EventType.DISCONNECTED, self._on_disconnected)
|
|
|
|
self._client.on_connected(transport=transport, device=device)
|
|
|
|
# Fetch initial contacts
|
|
try:
|
|
await self._mc.commands.get_contacts(lastmod=0, timeout=5)
|
|
for pubkey, contact in self._mc.contacts.items():
|
|
self._client.on_node(self._contact_to_node(contact))
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch initial contacts: %s", exc)
|
|
|
|
# Keep the loop alive until stop is signalled.
|
|
# Poll _stop_event every second so stop() is honoured even if
|
|
# _asyncio_stop.set() was missed due to a startup race.
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
await asyncio.wait_for(self._asyncio_stop.wait(), timeout=1.0)
|
|
break
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
if self._mc and self._mc.is_connected:
|
|
await self._mc.disconnect()
|
|
|
|
# -- Event callbacks (called from asyncio event dispatcher) --
|
|
|
|
def _on_private_msg(self, event) -> None:
|
|
from utils.meshcore import MeshcoreMessage
|
|
|
|
p = event.payload
|
|
msg = MeshcoreMessage(
|
|
id=str(uuid.uuid4()),
|
|
sender_id=str(p.get("pubkey_prefix", "unknown")),
|
|
recipient_id="DIRECT",
|
|
text=str(p.get("text", "")),
|
|
timestamp=datetime.now(timezone.utc),
|
|
hop_count=int(p.get("path_len", 0) or 0),
|
|
snr=None,
|
|
is_direct=True,
|
|
)
|
|
self._client.on_message(msg)
|
|
|
|
def _on_channel_msg(self, event) -> None:
|
|
from utils.meshcore import MeshcoreMessage
|
|
|
|
p = event.payload
|
|
msg = MeshcoreMessage(
|
|
id=str(uuid.uuid4()),
|
|
sender_id=str(p.get("pubkey_prefix") or p.get("sender_id", "unknown")),
|
|
recipient_id=f"CHAN{p.get('channel_idx', 0)}",
|
|
text=str(p.get("text", "")),
|
|
timestamp=datetime.now(timezone.utc),
|
|
hop_count=int(p.get("path_len", 0) or 0),
|
|
snr=None,
|
|
is_direct=False,
|
|
)
|
|
self._client.on_message(msg)
|
|
|
|
def _on_advertisement(self, event) -> None:
|
|
contact = event.payload
|
|
if not contact:
|
|
return
|
|
self._client.on_node(self._contact_to_node(contact))
|
|
|
|
def _on_stats_core(self, event) -> None:
|
|
from utils.meshcore import MeshcoreTelemetry
|
|
|
|
p = event.payload
|
|
node_id = "self" # stats_core is always for the local node
|
|
battery_mv = p.get("battery_mv")
|
|
battery_pct = min(int(battery_mv / 42), 100) if battery_mv else None # rough: 4200mv = 100%
|
|
t = MeshcoreTelemetry(
|
|
node_id=node_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
battery_pct=battery_pct,
|
|
voltage=battery_mv / 1000.0 if battery_mv else None,
|
|
temperature=None,
|
|
humidity=None,
|
|
uptime_secs=int(p.get("uptime_secs", 0) or 0),
|
|
)
|
|
self._client.on_telemetry(t)
|
|
|
|
def _on_trace_data(self, event) -> None:
|
|
from utils.meshcore import MeshcoreTraceroute
|
|
|
|
p = event.payload or {}
|
|
# TRACE_DATA payload structure varies; extract what we can
|
|
hops = p.get("hops") or p.get("path") or []
|
|
if isinstance(hops, str):
|
|
hops = hops.split(",") if hops else []
|
|
snr_per_hop = p.get("snr_per_hop") or []
|
|
tr = MeshcoreTraceroute(
|
|
origin_id=str(p.get("origin_id", "self")),
|
|
destination_id=str(p.get("destination_id", "unknown")),
|
|
hops=[str(h) for h in hops],
|
|
snr_per_hop=[float(s) for s in snr_per_hop if s is not None],
|
|
timestamp=datetime.now(timezone.utc),
|
|
)
|
|
self._client.on_traceroute(tr)
|
|
|
|
def _on_disconnected(self, event) -> None:
|
|
if not self._stop_event.is_set():
|
|
self._client.on_error("Device disconnected unexpectedly")
|
|
if self._asyncio_stop:
|
|
self._asyncio_stop.set()
|
|
|
|
# -- Helpers --
|
|
|
|
def _contact_to_node(self, contact: dict):
|
|
from utils.meshcore import MeshcoreNode, _is_repeater_contact
|
|
|
|
lat = contact.get("adv_lat")
|
|
lon = contact.get("adv_lon")
|
|
return MeshcoreNode(
|
|
node_id=str(contact.get("public_key", "")),
|
|
name=str(contact.get("adv_name", "Unknown")),
|
|
is_repeater=_is_repeater_contact(contact),
|
|
lat=float(lat) if lat else None,
|
|
lon=float(lon) if lon else None,
|
|
battery_pct=None,
|
|
last_seen=datetime.now(timezone.utc),
|
|
snr=None,
|
|
hops_away=int(contact.get("out_path_len", -1)),
|
|
)
|
|
|
|
# -- Commands (called from Flask thread) --
|
|
|
|
def _submit(self, coro) -> None:
|
|
if self._loop and self._loop.is_running():
|
|
asyncio.run_coroutine_threadsafe(coro, self._loop)
|
|
else:
|
|
coro.close()
|
|
logger.debug("Command dropped: worker not running")
|
|
|
|
def send_text(self, recipient_id: str, text: str) -> None:
|
|
async def _send():
|
|
if self._mc:
|
|
await self._mc.commands.send_msg(recipient_id, text)
|
|
|
|
self._submit(_send())
|
|
|
|
def request_traceroute(self, node_id: str) -> None:
|
|
async def _trace():
|
|
if self._mc:
|
|
logger.debug("Requesting traceroute (target hint: %s)", node_id)
|
|
await self._mc.commands.send_trace(auth_code=0)
|
|
|
|
self._submit(_trace())
|
|
|
|
def scan_ble_sync(self) -> list[dict]:
|
|
if not self._loop or not self._loop.is_running():
|
|
# Start a one-shot loop for the scan
|
|
return asyncio.run(_scan_ble())
|
|
future = asyncio.run_coroutine_threadsafe(_scan_ble(), self._loop)
|
|
try:
|
|
return future.result(timeout=10)
|
|
except Exception as exc:
|
|
logger.warning("BLE scan failed: %s", exc)
|
|
return []
|
|
|
|
|
|
async def _scan_ble() -> list[dict]:
|
|
"""Scan for MeshCore BLE devices using bleak directly."""
|
|
try:
|
|
from bleak import BleakScanner
|
|
|
|
devices = await BleakScanner.discover(timeout=5.0)
|
|
return [
|
|
{
|
|
"address": d.address,
|
|
"name": d.name or "Unknown",
|
|
"rssi": getattr(d, "rssi", None),
|
|
}
|
|
for d in devices
|
|
if d.name and d.name.startswith("MeshCore")
|
|
]
|
|
except ImportError:
|
|
logger.warning("bleak not installed; BLE scan unavailable. Run: pip install bleak")
|
|
return []
|
|
except Exception as exc:
|
|
logger.warning("BLE scan failed: %s", exc)
|
|
return []
|