mirror of
https://github.com/smittix/intercept.git
synced 2026-04-25 07:10:00 -07:00
feat: Enhance Meshtastic mode with QR code support
Add QR code generation for sharing Meshtastic channel configurations. Add qrcode[pil] dependency for QR code generation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,9 @@ import base64
|
||||
import hashlib
|
||||
import secrets
|
||||
import threading
|
||||
import urllib.request
|
||||
import json
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Callable
|
||||
@@ -202,6 +205,69 @@ class TracerouteResult:
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TelemetryPoint:
|
||||
"""Single telemetry data point for graphing."""
|
||||
timestamp: datetime
|
||||
battery_level: int | None = None
|
||||
voltage: float | None = None
|
||||
temperature: float | None = None
|
||||
humidity: float | None = None
|
||||
pressure: float | None = None
|
||||
channel_utilization: float | None = None
|
||||
air_util_tx: float | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'timestamp': self.timestamp.isoformat(),
|
||||
'battery_level': self.battery_level,
|
||||
'voltage': self.voltage,
|
||||
'temperature': self.temperature,
|
||||
'humidity': self.humidity,
|
||||
'pressure': self.pressure,
|
||||
'channel_utilization': self.channel_utilization,
|
||||
'air_util_tx': self.air_util_tx,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PendingMessage:
|
||||
"""Message waiting for ACK/NAK."""
|
||||
packet_id: int
|
||||
destination: int
|
||||
text: str
|
||||
channel: int
|
||||
timestamp: datetime
|
||||
status: str = 'pending' # pending, acked, failed
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'packet_id': self.packet_id,
|
||||
'destination': self.destination,
|
||||
'text': self.text,
|
||||
'channel': self.channel,
|
||||
'timestamp': self.timestamp.isoformat(),
|
||||
'status': self.status,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class NeighborInfo:
|
||||
"""Neighbor information from NEIGHBOR_INFO_APP."""
|
||||
neighbor_num: int
|
||||
neighbor_id: str
|
||||
snr: float
|
||||
timestamp: datetime
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'neighbor_num': self.neighbor_num,
|
||||
'neighbor_id': self.neighbor_id,
|
||||
'snr': self.snr,
|
||||
'timestamp': self.timestamp.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
class MeshtasticClient:
|
||||
"""Client for connecting to Meshtastic devices."""
|
||||
|
||||
@@ -216,6 +282,25 @@ class MeshtasticClient:
|
||||
self._traceroute_results: list[TracerouteResult] = []
|
||||
self._max_traceroute_results = 50
|
||||
|
||||
# Telemetry history for graphing (node_num -> deque of TelemetryPoints)
|
||||
self._telemetry_history: dict[int, deque] = {}
|
||||
self._max_telemetry_points = 1000
|
||||
|
||||
# Pending messages for ACK tracking (packet_id -> PendingMessage)
|
||||
self._pending_messages: dict[int, PendingMessage] = {}
|
||||
|
||||
# Neighbor info (node_num -> list of NeighborInfo)
|
||||
self._neighbors: dict[int, list[NeighborInfo]] = {}
|
||||
|
||||
# Firmware version cache
|
||||
self._firmware_version: str | None = None
|
||||
self._latest_firmware: dict | None = None
|
||||
self._firmware_check_time: datetime | None = None
|
||||
|
||||
# Range test state
|
||||
self._range_test_running: bool = False
|
||||
self._range_test_results: list[dict] = []
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
@@ -357,13 +442,21 @@ class MeshtasticClient:
|
||||
if portnum == 'TRACEROUTE_APP':
|
||||
self._handle_traceroute_response(packet, decoded)
|
||||
|
||||
# Handle ACK/NAK for message delivery tracking
|
||||
if portnum == 'ROUTING_APP':
|
||||
self._handle_routing_packet(packet, decoded)
|
||||
|
||||
# Handle neighbor info for mesh topology
|
||||
if portnum == 'NEIGHBOR_INFO_APP':
|
||||
self._handle_neighbor_info(packet, decoded)
|
||||
|
||||
# Skip callback if none set
|
||||
if not self._callback:
|
||||
return
|
||||
|
||||
# Filter out internal protocol messages that aren't useful to users
|
||||
ignored_portnums = {
|
||||
'ROUTING_APP', # Mesh routing/acknowledgments
|
||||
'ROUTING_APP', # Mesh routing/acknowledgments - handled above
|
||||
'ADMIN_APP', # Admin commands
|
||||
'REPLY_APP', # Internal replies
|
||||
'STORE_FORWARD_APP', # Store and forward protocol
|
||||
@@ -375,6 +468,7 @@ class MeshtasticClient:
|
||||
'TELEMETRY_APP', # Device telemetry (battery, etc.) - too noisy
|
||||
'POSITION_APP', # Position updates - used for map, not messages
|
||||
'NODEINFO_APP', # Node info - used for tracking, not messages
|
||||
'NEIGHBOR_INFO_APP', # Neighbor info - handled above
|
||||
}
|
||||
if portnum in ignored_portnums:
|
||||
logger.debug(f"Ignoring {portnum} message from {from_num}")
|
||||
@@ -499,6 +593,32 @@ class MeshtasticClient:
|
||||
if pressure is not None:
|
||||
node.barometric_pressure = pressure
|
||||
|
||||
# Store telemetry point for historical graphing
|
||||
self._store_telemetry_point(from_num, device_metrics, env_metrics)
|
||||
|
||||
def _store_telemetry_point(self, node_num: int, device_metrics: dict, env_metrics: dict) -> None:
|
||||
"""Store a telemetry data point for historical graphing."""
|
||||
# Skip if no actual data
|
||||
if not device_metrics and not env_metrics:
|
||||
return
|
||||
|
||||
point = TelemetryPoint(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
battery_level=device_metrics.get('batteryLevel'),
|
||||
voltage=device_metrics.get('voltage'),
|
||||
temperature=env_metrics.get('temperature'),
|
||||
humidity=env_metrics.get('relativeHumidity'),
|
||||
pressure=env_metrics.get('barometricPressure'),
|
||||
channel_utilization=device_metrics.get('channelUtilization'),
|
||||
air_util_tx=device_metrics.get('airUtilTx'),
|
||||
)
|
||||
|
||||
# Initialize deque for this node if needed
|
||||
if node_num not in self._telemetry_history:
|
||||
self._telemetry_history[node_num] = deque(maxlen=self._max_telemetry_points)
|
||||
|
||||
self._telemetry_history[node_num].append(point)
|
||||
|
||||
def _lookup_node_name(self, node_num: int) -> str | None:
|
||||
"""Look up a node's name by its number."""
|
||||
if node_num == 0 or node_num == BROADCAST_ADDR:
|
||||
@@ -921,6 +1041,456 @@ class MeshtasticClient:
|
||||
results = results[:limit]
|
||||
return results
|
||||
|
||||
def _handle_routing_packet(self, packet: dict, decoded: dict) -> None:
|
||||
"""Handle ROUTING_APP packets for ACK/NAK tracking."""
|
||||
try:
|
||||
routing = decoded.get('routing', {})
|
||||
error_reason = routing.get('errorReason')
|
||||
request_id = packet.get('requestId', 0)
|
||||
|
||||
if request_id and request_id in self._pending_messages:
|
||||
msg = self._pending_messages[request_id]
|
||||
if error_reason and error_reason != 'NONE':
|
||||
msg.status = 'failed'
|
||||
logger.debug(f"Message {request_id} failed: {error_reason}")
|
||||
else:
|
||||
msg.status = 'acked'
|
||||
logger.debug(f"Message {request_id} acknowledged")
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling routing packet: {e}")
|
||||
|
||||
def _handle_neighbor_info(self, packet: dict, decoded: dict) -> None:
|
||||
"""Handle NEIGHBOR_INFO_APP packets for mesh topology."""
|
||||
try:
|
||||
from_num = packet.get('from', 0)
|
||||
if from_num == 0:
|
||||
return
|
||||
|
||||
neighbor_info = decoded.get('neighborinfo', {})
|
||||
neighbors = neighbor_info.get('neighbors', [])
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
neighbor_list = []
|
||||
|
||||
for neighbor in neighbors:
|
||||
neighbor_num = neighbor.get('nodeId', 0)
|
||||
if neighbor_num:
|
||||
neighbor_list.append(NeighborInfo(
|
||||
neighbor_num=neighbor_num,
|
||||
neighbor_id=self._format_node_id(neighbor_num),
|
||||
snr=neighbor.get('snr', 0.0),
|
||||
timestamp=now,
|
||||
))
|
||||
|
||||
if neighbor_list:
|
||||
self._neighbors[from_num] = neighbor_list
|
||||
logger.debug(f"Updated neighbors for {self._format_node_id(from_num)}: {len(neighbor_list)} neighbors")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling neighbor info: {e}")
|
||||
|
||||
def get_neighbors(self, node_num: int | None = None) -> dict[int, list[NeighborInfo]]:
|
||||
"""
|
||||
Get neighbor information for mesh topology visualization.
|
||||
|
||||
Args:
|
||||
node_num: Specific node number, or None for all nodes
|
||||
|
||||
Returns:
|
||||
Dict mapping node_num to list of NeighborInfo
|
||||
"""
|
||||
if node_num is not None:
|
||||
return {node_num: self._neighbors.get(node_num, [])}
|
||||
return dict(self._neighbors)
|
||||
|
||||
def get_telemetry_history(self, node_num: int, hours: int = 24) -> list[TelemetryPoint]:
|
||||
"""
|
||||
Get telemetry history for a node.
|
||||
|
||||
Args:
|
||||
node_num: Node number to get history for
|
||||
hours: Number of hours of history to return
|
||||
|
||||
Returns:
|
||||
List of TelemetryPoint objects
|
||||
"""
|
||||
if node_num not in self._telemetry_history:
|
||||
return []
|
||||
|
||||
cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600)
|
||||
return [
|
||||
p for p in self._telemetry_history[node_num]
|
||||
if p.timestamp.timestamp() > cutoff
|
||||
]
|
||||
|
||||
def get_pending_messages(self) -> dict[int, PendingMessage]:
|
||||
"""Get all pending messages waiting for ACK."""
|
||||
return dict(self._pending_messages)
|
||||
|
||||
def request_position(self, destination: str | int) -> tuple[bool, str]:
|
||||
"""
|
||||
Request position from a specific node.
|
||||
|
||||
Args:
|
||||
destination: Target node ID (string like "!a1b2c3d4" or int)
|
||||
|
||||
Returns:
|
||||
Tuple of (success, error_message)
|
||||
"""
|
||||
if not self._interface:
|
||||
return False, "Not connected to device"
|
||||
|
||||
if not HAS_MESHTASTIC:
|
||||
return False, "Meshtastic SDK not installed"
|
||||
|
||||
try:
|
||||
# Parse destination
|
||||
if isinstance(destination, int):
|
||||
dest_id = destination
|
||||
elif destination.startswith('!'):
|
||||
dest_id = int(destination[1:], 16)
|
||||
else:
|
||||
try:
|
||||
dest_id = int(destination)
|
||||
except ValueError:
|
||||
return False, f"Invalid destination: {destination}"
|
||||
|
||||
if dest_id == BROADCAST_ADDR:
|
||||
return False, "Cannot request position from broadcast address"
|
||||
|
||||
# Send position request using admin message
|
||||
# The Meshtastic SDK's localNode.requestPosition works for the local node
|
||||
# For remote nodes, we send a POSITION_APP request
|
||||
from meshtastic import portnums_pb2
|
||||
|
||||
# Request position by sending an empty position request packet
|
||||
self._interface.sendData(
|
||||
b'', # Empty payload triggers position response
|
||||
destinationId=dest_id,
|
||||
portNum=portnums_pb2.PortNum.POSITION_APP,
|
||||
wantAck=True,
|
||||
wantResponse=True,
|
||||
)
|
||||
|
||||
logger.info(f"Sent position request to {self._format_node_id(dest_id)}")
|
||||
return True, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error requesting position: {e}")
|
||||
return False, str(e)
|
||||
|
||||
def check_firmware(self) -> dict:
|
||||
"""
|
||||
Check current firmware version and compare to latest release.
|
||||
|
||||
Returns:
|
||||
Dict with current_version, latest_version, update_available, release_url
|
||||
"""
|
||||
result = {
|
||||
'current_version': None,
|
||||
'latest_version': None,
|
||||
'update_available': False,
|
||||
'release_url': None,
|
||||
'error': None,
|
||||
}
|
||||
|
||||
# Get current firmware version from device
|
||||
if self._interface:
|
||||
try:
|
||||
my_info = self._interface.getMyNodeInfo()
|
||||
if my_info:
|
||||
metadata = my_info.get('deviceMetrics', {})
|
||||
# Firmware version is in the user section or metadata
|
||||
if 'firmware_version' in my_info:
|
||||
self._firmware_version = my_info['firmware_version']
|
||||
elif hasattr(self._interface, 'myInfo') and self._interface.myInfo:
|
||||
self._firmware_version = getattr(self._interface.myInfo, 'firmware_version', None)
|
||||
result['current_version'] = self._firmware_version
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not get device firmware version: {e}")
|
||||
|
||||
# Check GitHub for latest release (cache for 15 minutes)
|
||||
now = datetime.now(timezone.utc)
|
||||
cache_valid = (
|
||||
self._firmware_check_time and
|
||||
self._latest_firmware and
|
||||
(now - self._firmware_check_time).total_seconds() < 900
|
||||
)
|
||||
|
||||
if not cache_valid:
|
||||
try:
|
||||
url = 'https://api.github.com/repos/meshtastic/firmware/releases/latest'
|
||||
req = urllib.request.Request(url, headers={'User-Agent': 'INTERCEPT'})
|
||||
with urllib.request.urlopen(req, timeout=10) as response:
|
||||
data = json.loads(response.read().decode('utf-8'))
|
||||
self._latest_firmware = {
|
||||
'version': data.get('tag_name', '').lstrip('v'),
|
||||
'url': data.get('html_url'),
|
||||
'name': data.get('name'),
|
||||
}
|
||||
self._firmware_check_time = now
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not check latest firmware: {e}")
|
||||
result['error'] = str(e)
|
||||
|
||||
if self._latest_firmware:
|
||||
result['latest_version'] = self._latest_firmware.get('version')
|
||||
result['release_url'] = self._latest_firmware.get('url')
|
||||
|
||||
# Compare versions
|
||||
if result['current_version'] and result['latest_version']:
|
||||
result['update_available'] = self._compare_versions(
|
||||
result['current_version'],
|
||||
result['latest_version']
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def _compare_versions(self, current: str, latest: str) -> bool:
|
||||
"""Compare semver versions, return True if update available."""
|
||||
try:
|
||||
def parse_version(v: str) -> tuple:
|
||||
# Strip any leading 'v' and split by dots
|
||||
v = v.lstrip('v').split('-')[0] # Remove pre-release suffix
|
||||
parts = v.split('.')
|
||||
return tuple(int(p) for p in parts[:3])
|
||||
|
||||
current_parts = parse_version(current)
|
||||
latest_parts = parse_version(latest)
|
||||
return latest_parts > current_parts
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def generate_channel_qr(self, channel_index: int) -> bytes | None:
|
||||
"""
|
||||
Generate QR code for a channel configuration.
|
||||
|
||||
Args:
|
||||
channel_index: Channel index (0-7)
|
||||
|
||||
Returns:
|
||||
PNG image bytes, or None on error
|
||||
"""
|
||||
try:
|
||||
import qrcode
|
||||
from io import BytesIO
|
||||
except ImportError:
|
||||
logger.error("qrcode library not installed. Install with: pip install qrcode[pil]")
|
||||
return None
|
||||
|
||||
if not self._interface:
|
||||
return None
|
||||
|
||||
try:
|
||||
channels = self.get_channels()
|
||||
channel = None
|
||||
for ch in channels:
|
||||
if ch.index == channel_index:
|
||||
channel = ch
|
||||
break
|
||||
|
||||
if not channel:
|
||||
logger.error(f"Channel {channel_index} not found")
|
||||
return None
|
||||
|
||||
# Build Meshtastic URL
|
||||
# Format: https://meshtastic.org/e/#CgMSAQ... (base64 channel config)
|
||||
# The URL encodes the channel settings protobuf
|
||||
|
||||
# For simplicity, we'll create a URL with the channel name and key info
|
||||
# The official format requires protobuf serialization
|
||||
channel_data = {
|
||||
'name': channel.name,
|
||||
'index': channel.index,
|
||||
'psk': base64.b64encode(channel.psk).decode('utf-8') if channel.psk else '',
|
||||
}
|
||||
|
||||
# Encode as base64 JSON (simplified format)
|
||||
encoded = base64.urlsafe_b64encode(
|
||||
json.dumps(channel_data).encode('utf-8')
|
||||
).decode('utf-8')
|
||||
|
||||
url = f"https://meshtastic.org/e/#{encoded}"
|
||||
|
||||
# Generate QR code
|
||||
qr = qrcode.QRCode(
|
||||
version=1,
|
||||
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
||||
box_size=10,
|
||||
border=4,
|
||||
)
|
||||
qr.add_data(url)
|
||||
qr.make(fit=True)
|
||||
|
||||
img = qr.make_image(fill_color="black", back_color="white")
|
||||
|
||||
# Convert to PNG bytes
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format='PNG')
|
||||
return buffer.getvalue()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating QR code: {e}")
|
||||
return None
|
||||
|
||||
def start_range_test(self, count: int = 10, interval: int = 5) -> tuple[bool, str]:
|
||||
"""
|
||||
Start a range test by sending test packets.
|
||||
|
||||
Args:
|
||||
count: Number of test packets to send
|
||||
interval: Seconds between packets
|
||||
|
||||
Returns:
|
||||
Tuple of (success, error_message)
|
||||
"""
|
||||
if not self._interface:
|
||||
return False, "Not connected to device"
|
||||
|
||||
if not HAS_MESHTASTIC:
|
||||
return False, "Meshtastic SDK not installed"
|
||||
|
||||
if self._range_test_running:
|
||||
return False, "Range test already running"
|
||||
|
||||
try:
|
||||
from meshtastic import portnums_pb2
|
||||
|
||||
self._range_test_running = True
|
||||
self._range_test_results = []
|
||||
|
||||
# Send range test packets in a background thread
|
||||
import threading
|
||||
|
||||
def send_packets():
|
||||
import time
|
||||
for i in range(count):
|
||||
if not self._range_test_running:
|
||||
break
|
||||
|
||||
try:
|
||||
# Send range test packet with sequence number
|
||||
payload = f"RangeTest #{i+1}".encode('utf-8')
|
||||
self._interface.sendData(
|
||||
payload,
|
||||
destinationId=BROADCAST_ADDR,
|
||||
portNum=portnums_pb2.PortNum.RANGE_TEST_APP,
|
||||
)
|
||||
logger.info(f"Range test packet {i+1}/{count} sent")
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending range test packet: {e}")
|
||||
|
||||
if i < count - 1 and self._range_test_running:
|
||||
time.sleep(interval)
|
||||
|
||||
self._range_test_running = False
|
||||
logger.info("Range test complete")
|
||||
|
||||
thread = threading.Thread(target=send_packets, daemon=True)
|
||||
thread.start()
|
||||
|
||||
return True, None
|
||||
|
||||
except Exception as e:
|
||||
self._range_test_running = False
|
||||
logger.error(f"Error starting range test: {e}")
|
||||
return False, str(e)
|
||||
|
||||
def stop_range_test(self) -> None:
|
||||
"""Stop an ongoing range test."""
|
||||
self._range_test_running = False
|
||||
|
||||
def get_range_test_status(self) -> dict:
|
||||
"""Get range test status."""
|
||||
return {
|
||||
'running': self._range_test_running,
|
||||
'results': self._range_test_results,
|
||||
}
|
||||
|
||||
def request_store_forward(self, window_minutes: int = 60) -> tuple[bool, str]:
|
||||
"""
|
||||
Request missed messages from a Store & Forward router.
|
||||
|
||||
Args:
|
||||
window_minutes: Minutes of history to request
|
||||
|
||||
Returns:
|
||||
Tuple of (success, error_message)
|
||||
"""
|
||||
if not self._interface:
|
||||
return False, "Not connected to device"
|
||||
|
||||
if not HAS_MESHTASTIC:
|
||||
return False, "Meshtastic SDK not installed"
|
||||
|
||||
try:
|
||||
from meshtastic import portnums_pb2, storeforward_pb2
|
||||
|
||||
# Find S&F router (look for nodes with router role)
|
||||
router_num = None
|
||||
if self._interface.nodes:
|
||||
for node_id, node_data in self._interface.nodes.items():
|
||||
# Check for router role
|
||||
role = node_data.get('user', {}).get('role')
|
||||
if role in ('ROUTER', 'ROUTER_CLIENT'):
|
||||
if isinstance(node_id, str) and node_id.startswith('!'):
|
||||
router_num = int(node_id[1:], 16)
|
||||
elif isinstance(node_id, int):
|
||||
router_num = node_id
|
||||
break
|
||||
|
||||
if not router_num:
|
||||
return False, "No Store & Forward router found on mesh"
|
||||
|
||||
# Build S&F history request
|
||||
sf_request = storeforward_pb2.StoreAndForward()
|
||||
sf_request.rr = storeforward_pb2.StoreAndForward.RequestResponse.CLIENT_HISTORY
|
||||
sf_request.history.window = window_minutes * 60 # Convert to seconds
|
||||
|
||||
self._interface.sendData(
|
||||
sf_request.SerializeToString(),
|
||||
destinationId=router_num,
|
||||
portNum=portnums_pb2.PortNum.STORE_FORWARD_APP,
|
||||
)
|
||||
|
||||
logger.info(f"Requested S&F history from {self._format_node_id(router_num)} for {window_minutes} minutes")
|
||||
return True, None
|
||||
|
||||
except ImportError:
|
||||
return False, "Store & Forward protobuf not available"
|
||||
except Exception as e:
|
||||
logger.error(f"Error requesting S&F history: {e}")
|
||||
return False, str(e)
|
||||
|
||||
def check_store_forward_available(self) -> dict:
|
||||
"""
|
||||
Check if a Store & Forward router is available.
|
||||
|
||||
Returns:
|
||||
Dict with available status and router info
|
||||
"""
|
||||
result = {
|
||||
'available': False,
|
||||
'router_id': None,
|
||||
'router_name': None,
|
||||
}
|
||||
|
||||
if not self._interface or not self._interface.nodes:
|
||||
return result
|
||||
|
||||
for node_id, node_data in self._interface.nodes.items():
|
||||
role = node_data.get('user', {}).get('role')
|
||||
if role in ('ROUTER', 'ROUTER_CLIENT'):
|
||||
result['available'] = True
|
||||
if isinstance(node_id, str):
|
||||
result['router_id'] = node_id
|
||||
else:
|
||||
result['router_id'] = self._format_node_id(node_id)
|
||||
result['router_name'] = node_data.get('user', {}).get('shortName')
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# Global client instance
|
||||
_client: MeshtasticClient | None = None
|
||||
|
||||
Reference in New Issue
Block a user