mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Add full telemetry display in node popups including device metrics (voltage, channel utilization, air TX) and environment sensors (temperature, humidity, barometric pressure). Add traceroute functionality with interactive visualization showing hop paths and SNR values. Includes API endpoints for sending traceroutes and retrieving results, plus a modal UI for displaying route information. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
969 lines
35 KiB
Python
969 lines
35 KiB
Python
"""Meshtastic device management and message handling.
|
|
|
|
This module provides integration with Meshtastic mesh networking devices,
|
|
allowing INTERCEPT to receive and decode messages from LoRa mesh networks.
|
|
|
|
Requires a physical Meshtastic device connected via USB/Serial.
|
|
Install SDK with: pip install meshtastic
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import secrets
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Callable
|
|
|
|
from utils.logging import get_logger
|
|
|
|
logger = get_logger('intercept.meshtastic')
|
|
|
|
# Meshtastic SDK import (optional dependency)
|
|
try:
|
|
import meshtastic
|
|
import meshtastic.serial_interface
|
|
from meshtastic import BROADCAST_ADDR
|
|
from pubsub import pub
|
|
HAS_MESHTASTIC = True
|
|
except ImportError:
|
|
HAS_MESHTASTIC = False
|
|
BROADCAST_ADDR = 0xFFFFFFFF # Fallback if SDK not installed
|
|
logger.warning("Meshtastic SDK not installed. Install with: pip install meshtastic")
|
|
|
|
|
|
@dataclass
|
|
class MeshtasticMessage:
|
|
"""Decoded Meshtastic message."""
|
|
from_id: str
|
|
to_id: str
|
|
message: str | None
|
|
portnum: str
|
|
channel: int
|
|
rssi: int | None
|
|
snr: float | None
|
|
hop_limit: int | None
|
|
timestamp: datetime
|
|
from_name: str | None = None
|
|
to_name: str | None = None
|
|
raw_packet: dict = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'type': 'meshtastic',
|
|
'from': self.from_id,
|
|
'from_name': self.from_name,
|
|
'to': self.to_id,
|
|
'to_name': self.to_name,
|
|
'message': self.message,
|
|
'text': self.message, # Alias for frontend compatibility
|
|
'portnum': self.portnum,
|
|
'channel': self.channel,
|
|
'rssi': self.rssi,
|
|
'snr': self.snr,
|
|
'hop_limit': self.hop_limit,
|
|
'timestamp': self.timestamp.timestamp(), # Unix seconds for frontend
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ChannelConfig:
|
|
"""Meshtastic channel configuration."""
|
|
index: int
|
|
name: str
|
|
psk: bytes
|
|
role: int # 0=DISABLED, 1=PRIMARY, 2=SECONDARY
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dict for API response (hides raw PSK)."""
|
|
role_names = ['DISABLED', 'PRIMARY', 'SECONDARY']
|
|
# Default key is 1 byte (0x01) or the well-known AQ== base64
|
|
is_default = self.psk in (b'\x01', b'')
|
|
return {
|
|
'index': self.index,
|
|
'name': self.name,
|
|
'role': role_names[self.role] if self.role < len(role_names) else 'UNKNOWN',
|
|
'encrypted': len(self.psk) > 1,
|
|
'key_type': self._get_key_type(),
|
|
'is_default_key': is_default,
|
|
}
|
|
|
|
def _get_key_type(self) -> str:
|
|
"""Determine encryption type from key length."""
|
|
if len(self.psk) == 0:
|
|
return 'none'
|
|
elif len(self.psk) == 1:
|
|
return 'default'
|
|
elif len(self.psk) == 16:
|
|
return 'AES-128'
|
|
elif len(self.psk) == 32:
|
|
return 'AES-256'
|
|
else:
|
|
return 'unknown'
|
|
|
|
|
|
@dataclass
|
|
class MeshNode:
|
|
"""Tracked Meshtastic node with position and metadata."""
|
|
num: int
|
|
user_id: str
|
|
long_name: str
|
|
short_name: str
|
|
hw_model: str
|
|
latitude: float | None = None
|
|
longitude: float | None = None
|
|
altitude: int | None = None
|
|
battery_level: int | None = None
|
|
snr: float | None = None
|
|
last_heard: datetime | None = None
|
|
# Device telemetry
|
|
voltage: float | None = None
|
|
channel_utilization: float | None = None
|
|
air_util_tx: float | None = None
|
|
# Environment telemetry
|
|
temperature: float | None = None
|
|
humidity: float | None = None
|
|
barometric_pressure: float | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'num': self.num,
|
|
'id': self.user_id or f"!{self.num:08x}",
|
|
'long_name': self.long_name,
|
|
'short_name': self.short_name,
|
|
'hw_model': self.hw_model,
|
|
'latitude': self.latitude,
|
|
'longitude': self.longitude,
|
|
'altitude': self.altitude,
|
|
'battery_level': self.battery_level,
|
|
'snr': self.snr,
|
|
'last_heard': self.last_heard.isoformat() if self.last_heard else None,
|
|
'has_position': self.latitude is not None and self.longitude is not None,
|
|
# Device telemetry
|
|
'voltage': self.voltage,
|
|
'channel_utilization': self.channel_utilization,
|
|
'air_util_tx': self.air_util_tx,
|
|
# Environment telemetry
|
|
'temperature': self.temperature,
|
|
'humidity': self.humidity,
|
|
'barometric_pressure': self.barometric_pressure,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class NodeInfo:
|
|
"""Meshtastic node information."""
|
|
num: int
|
|
user_id: str
|
|
long_name: str
|
|
short_name: str
|
|
hw_model: str
|
|
latitude: float | None
|
|
longitude: float | None
|
|
altitude: int | None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'num': self.num,
|
|
'user_id': self.user_id,
|
|
'long_name': self.long_name,
|
|
'short_name': self.short_name,
|
|
'hw_model': self.hw_model,
|
|
'position': {
|
|
'latitude': self.latitude,
|
|
'longitude': self.longitude,
|
|
'altitude': self.altitude,
|
|
} if self.latitude is not None else None,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class TracerouteResult:
|
|
"""Result of a traceroute to a mesh node."""
|
|
destination_id: str
|
|
route: list[str] # Node IDs in forward path
|
|
route_back: list[str] # Return path
|
|
snr_towards: list[float] # SNR per hop (forward)
|
|
snr_back: list[float] # SNR per hop (return)
|
|
timestamp: datetime
|
|
success: bool
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'destination_id': self.destination_id,
|
|
'route': self.route,
|
|
'route_back': self.route_back,
|
|
'snr_towards': self.snr_towards,
|
|
'snr_back': self.snr_back,
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'success': self.success,
|
|
}
|
|
|
|
|
|
class MeshtasticClient:
|
|
"""Client for connecting to Meshtastic devices."""
|
|
|
|
def __init__(self):
|
|
self._interface = None
|
|
self._running = False
|
|
self._callback: Callable[[MeshtasticMessage], None] | None = None
|
|
self._lock = threading.Lock()
|
|
self._nodes: dict[int, MeshNode] = {} # num -> MeshNode
|
|
self._device_path: str | None = None
|
|
self._error: str | None = None
|
|
self._traceroute_results: list[TracerouteResult] = []
|
|
self._max_traceroute_results = 50
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
@property
|
|
def device_path(self) -> str | None:
|
|
return self._device_path
|
|
|
|
@property
|
|
def error(self) -> str | None:
|
|
return self._error
|
|
|
|
def set_callback(self, callback: Callable[[MeshtasticMessage], None]) -> None:
|
|
"""Set callback for received messages."""
|
|
self._callback = callback
|
|
|
|
def connect(self, device: str | None = None) -> bool:
|
|
"""
|
|
Connect to a Meshtastic device.
|
|
|
|
Args:
|
|
device: Serial port path (e.g., /dev/ttyUSB0, /dev/ttyACM0).
|
|
If None, auto-discovers first available device.
|
|
|
|
Returns:
|
|
True if connected successfully.
|
|
"""
|
|
if not HAS_MESHTASTIC:
|
|
self._error = "Meshtastic SDK not installed. Install with: pip install meshtastic"
|
|
return False
|
|
|
|
with self._lock:
|
|
if self._running:
|
|
return True
|
|
|
|
try:
|
|
# Subscribe to message events before connecting
|
|
pub.subscribe(self._on_receive, "meshtastic.receive")
|
|
pub.subscribe(self._on_connection, "meshtastic.connection.established")
|
|
pub.subscribe(self._on_disconnect, "meshtastic.connection.lost")
|
|
|
|
# Connect to device
|
|
if device:
|
|
self._interface = meshtastic.serial_interface.SerialInterface(device)
|
|
self._device_path = device
|
|
else:
|
|
# Auto-discover
|
|
self._interface = meshtastic.serial_interface.SerialInterface()
|
|
self._device_path = "auto"
|
|
|
|
self._running = True
|
|
self._error = None
|
|
logger.info(f"Connected to Meshtastic device: {self._device_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._error = str(e)
|
|
logger.error(f"Failed to connect to Meshtastic: {e}")
|
|
self._cleanup_subscriptions()
|
|
return False
|
|
|
|
def disconnect(self) -> None:
|
|
"""Disconnect from the Meshtastic device."""
|
|
with self._lock:
|
|
if self._interface:
|
|
try:
|
|
self._interface.close()
|
|
except Exception as e:
|
|
logger.warning(f"Error closing Meshtastic interface: {e}")
|
|
self._interface = None
|
|
|
|
self._cleanup_subscriptions()
|
|
self._running = False
|
|
self._device_path = None
|
|
logger.info("Disconnected from Meshtastic device")
|
|
|
|
def _cleanup_subscriptions(self) -> None:
|
|
"""Unsubscribe from pubsub topics."""
|
|
if HAS_MESHTASTIC:
|
|
try:
|
|
pub.unsubscribe(self._on_receive, "meshtastic.receive")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
pub.unsubscribe(self._on_connection, "meshtastic.connection.established")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
pub.unsubscribe(self._on_disconnect, "meshtastic.connection.lost")
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_connection(self, interface, topic=None) -> None:
|
|
"""Handle connection established event."""
|
|
logger.info("Meshtastic connection established")
|
|
# Sync nodes from device's nodeDB so names are available for messages
|
|
self._sync_nodes_from_interface()
|
|
# Try to set device time from host computer
|
|
self._sync_device_time()
|
|
|
|
def _on_disconnect(self, interface, topic=None) -> None:
|
|
"""Handle connection lost event."""
|
|
logger.warning("Meshtastic connection lost")
|
|
self._running = False
|
|
|
|
def _sync_device_time(self) -> None:
|
|
"""Sync device time from host computer."""
|
|
if not self._interface:
|
|
return
|
|
try:
|
|
# Try to set the device's time using the SDK
|
|
import time
|
|
current_time = int(time.time())
|
|
if hasattr(self._interface, 'localNode') and self._interface.localNode:
|
|
local_node = self._interface.localNode
|
|
if hasattr(local_node, 'setTime'):
|
|
local_node.setTime(current_time)
|
|
logger.info(f"Set device time to {current_time}")
|
|
elif hasattr(self._interface, 'sendAdmin'):
|
|
# Alternative: send admin message with time
|
|
logger.debug("setTime not available, device time not synced")
|
|
else:
|
|
logger.debug("localNode not available, device time not synced")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to sync device time: {e}")
|
|
|
|
def _on_receive(self, packet: dict, interface) -> None:
|
|
"""Handle received packet from Meshtastic device."""
|
|
try:
|
|
decoded = packet.get('decoded', {})
|
|
from_num = packet.get('from', 0)
|
|
to_num = packet.get('to', 0)
|
|
portnum = decoded.get('portnum', 'UNKNOWN')
|
|
|
|
# Track node from packet (always, even for filtered messages)
|
|
self._track_node_from_packet(packet, decoded, portnum)
|
|
|
|
# Parse traceroute responses
|
|
if portnum == 'TRACEROUTE_APP':
|
|
self._handle_traceroute_response(packet, decoded)
|
|
|
|
# Skip callback if none set
|
|
if not self._callback:
|
|
return
|
|
|
|
# Filter out internal protocol messages that aren't useful to users
|
|
ignored_portnums = {
|
|
'ROUTING_APP', # Mesh routing/acknowledgments
|
|
'ADMIN_APP', # Admin commands
|
|
'REPLY_APP', # Internal replies
|
|
'STORE_FORWARD_APP', # Store and forward protocol
|
|
'RANGE_TEST_APP', # Range testing
|
|
'PAXCOUNTER_APP', # People counter
|
|
'REMOTE_HARDWARE_APP', # Remote hardware control
|
|
'SIMULATOR_APP', # Simulator
|
|
'MAP_REPORT_APP', # Map reporting
|
|
'TELEMETRY_APP', # Device telemetry (battery, etc.) - too noisy
|
|
'POSITION_APP', # Position updates - used for map, not messages
|
|
'NODEINFO_APP', # Node info - used for tracking, not messages
|
|
}
|
|
if portnum in ignored_portnums:
|
|
logger.debug(f"Ignoring {portnum} message from {from_num}")
|
|
return
|
|
|
|
# Extract text message if present
|
|
message = None
|
|
if portnum == 'TEXT_MESSAGE_APP':
|
|
message = decoded.get('text')
|
|
elif portnum in ('WAYPOINT_APP', 'TRACEROUTE_APP'):
|
|
# Show these as informational messages
|
|
message = f"[{portnum}]"
|
|
elif 'payload' in decoded:
|
|
# For other message types, include payload info
|
|
message = f"[{portnum}]"
|
|
|
|
# Look up node names - try cache first, then SDK's nodeDB
|
|
from_name = self._lookup_node_name(from_num)
|
|
to_name = self._lookup_node_name(to_num) if to_num != BROADCAST_ADDR else None
|
|
|
|
msg = MeshtasticMessage(
|
|
from_id=self._format_node_id(from_num),
|
|
to_id=self._format_node_id(to_num),
|
|
message=message,
|
|
portnum=portnum,
|
|
channel=packet.get('channel', 0),
|
|
rssi=packet.get('rxRssi'),
|
|
snr=packet.get('rxSnr'),
|
|
hop_limit=packet.get('hopLimit'),
|
|
timestamp=datetime.now(timezone.utc),
|
|
from_name=from_name,
|
|
to_name=to_name,
|
|
raw_packet=packet,
|
|
)
|
|
|
|
self._callback(msg)
|
|
logger.debug(f"Received: {msg.from_id} -> {msg.to_id}: {msg.portnum}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Meshtastic packet: {e}")
|
|
|
|
def _track_node_from_packet(self, packet: dict, decoded: dict, portnum: str) -> None:
|
|
"""Update node tracking from received packet."""
|
|
from_num = packet.get('from', 0)
|
|
if from_num == 0 or from_num == 0xFFFFFFFF:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Get or create node entry
|
|
if from_num not in self._nodes:
|
|
self._nodes[from_num] = MeshNode(
|
|
num=from_num,
|
|
user_id=f"!{from_num:08x}",
|
|
long_name='',
|
|
short_name='',
|
|
hw_model='UNKNOWN',
|
|
)
|
|
|
|
node = self._nodes[from_num]
|
|
node.last_heard = now
|
|
node.snr = packet.get('rxSnr', node.snr)
|
|
|
|
# Parse NODEINFO_APP for user details
|
|
if portnum == 'NODEINFO_APP':
|
|
user = decoded.get('user', {})
|
|
if user:
|
|
node.long_name = user.get('longName', node.long_name)
|
|
node.short_name = user.get('shortName', node.short_name)
|
|
node.hw_model = user.get('hwModel', node.hw_model)
|
|
if user.get('id'):
|
|
node.user_id = user.get('id')
|
|
|
|
# Parse POSITION_APP for location
|
|
elif portnum == 'POSITION_APP':
|
|
position = decoded.get('position', {})
|
|
if position:
|
|
lat = position.get('latitude') or position.get('latitudeI')
|
|
lon = position.get('longitude') or position.get('longitudeI')
|
|
|
|
# Handle integer format (latitudeI/longitudeI are in 1e-7 degrees)
|
|
if isinstance(lat, int) and abs(lat) > 1000:
|
|
lat = lat / 1e7
|
|
if isinstance(lon, int) and abs(lon) > 1000:
|
|
lon = lon / 1e7
|
|
|
|
if lat is not None and lon is not None:
|
|
node.latitude = lat
|
|
node.longitude = lon
|
|
node.altitude = position.get('altitude', node.altitude)
|
|
|
|
# Parse TELEMETRY_APP for battery and other metrics
|
|
elif portnum == 'TELEMETRY_APP':
|
|
telemetry = decoded.get('telemetry', {})
|
|
|
|
# Device metrics
|
|
device_metrics = telemetry.get('deviceMetrics', {})
|
|
if device_metrics:
|
|
battery = device_metrics.get('batteryLevel')
|
|
if battery is not None:
|
|
node.battery_level = battery
|
|
voltage = device_metrics.get('voltage')
|
|
if voltage is not None:
|
|
node.voltage = voltage
|
|
channel_util = device_metrics.get('channelUtilization')
|
|
if channel_util is not None:
|
|
node.channel_utilization = channel_util
|
|
air_util = device_metrics.get('airUtilTx')
|
|
if air_util is not None:
|
|
node.air_util_tx = air_util
|
|
|
|
# Environment metrics
|
|
env_metrics = telemetry.get('environmentMetrics', {})
|
|
if env_metrics:
|
|
temp = env_metrics.get('temperature')
|
|
if temp is not None:
|
|
node.temperature = temp
|
|
humidity = env_metrics.get('relativeHumidity')
|
|
if humidity is not None:
|
|
node.humidity = humidity
|
|
pressure = env_metrics.get('barometricPressure')
|
|
if pressure is not None:
|
|
node.barometric_pressure = pressure
|
|
|
|
def _lookup_node_name(self, node_num: int) -> str | None:
|
|
"""Look up a node's name by its number."""
|
|
if node_num == 0 or node_num == BROADCAST_ADDR:
|
|
return None
|
|
|
|
# Try our cache first
|
|
if node_num in self._nodes:
|
|
node = self._nodes[node_num]
|
|
name = node.short_name or node.long_name
|
|
if name:
|
|
return name
|
|
|
|
# Try SDK's nodeDB with various key formats
|
|
if self._interface and hasattr(self._interface, 'nodes') and self._interface.nodes:
|
|
nodes = self._interface.nodes
|
|
|
|
# Try direct lookup with different key formats
|
|
for key in [node_num, f"!{node_num:08x}", f"!{node_num:x}", str(node_num)]:
|
|
if key in nodes:
|
|
user = nodes[key].get('user', {})
|
|
name = user.get('shortName') or user.get('longName')
|
|
if name:
|
|
logger.debug(f"Found name '{name}' for node {node_num} with key {key}")
|
|
return name
|
|
|
|
# Search through all nodes by num field
|
|
for key, node_data in nodes.items():
|
|
if node_data.get('num') == node_num:
|
|
user = node_data.get('user', {})
|
|
name = user.get('shortName') or user.get('longName')
|
|
if name:
|
|
logger.debug(f"Found name '{name}' for node {node_num} by search")
|
|
return name
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _format_node_id(node_num: int) -> str:
|
|
"""Format node number as hex string."""
|
|
if node_num == 0xFFFFFFFF:
|
|
return "^all"
|
|
return f"!{node_num:08x}"
|
|
|
|
def get_node_info(self) -> NodeInfo | None:
|
|
"""Get local node information."""
|
|
if not self._interface:
|
|
return None
|
|
try:
|
|
node = self._interface.getMyNodeInfo()
|
|
user = node.get('user', {})
|
|
position = node.get('position', {})
|
|
|
|
return NodeInfo(
|
|
num=node.get('num', 0),
|
|
user_id=user.get('id', ''),
|
|
long_name=user.get('longName', ''),
|
|
short_name=user.get('shortName', ''),
|
|
hw_model=user.get('hwModel', 'UNKNOWN'),
|
|
latitude=position.get('latitude'),
|
|
longitude=position.get('longitude'),
|
|
altitude=position.get('altitude'),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting node info: {e}")
|
|
return None
|
|
|
|
def get_nodes(self) -> list[MeshNode]:
|
|
"""Get all tracked nodes."""
|
|
# Also pull nodes from the SDK's nodeDB if available
|
|
self._sync_nodes_from_interface()
|
|
return list(self._nodes.values())
|
|
|
|
def _sync_nodes_from_interface(self) -> None:
|
|
"""Sync nodes from the Meshtastic SDK's nodeDB."""
|
|
if not self._interface:
|
|
return
|
|
|
|
try:
|
|
nodes = self._interface.nodes
|
|
if not nodes:
|
|
return
|
|
|
|
for node_id, node_data in nodes.items():
|
|
# Skip if it's a string key like '!abcd1234'
|
|
if isinstance(node_id, str):
|
|
try:
|
|
num = int(node_id[1:], 16) if node_id.startswith('!') else int(node_id)
|
|
except ValueError:
|
|
continue
|
|
else:
|
|
num = node_id
|
|
|
|
user = node_data.get('user', {})
|
|
position = node_data.get('position', {})
|
|
|
|
# Get or create node
|
|
if num not in self._nodes:
|
|
self._nodes[num] = MeshNode(
|
|
num=num,
|
|
user_id=user.get('id', f"!{num:08x}"),
|
|
long_name=user.get('longName', ''),
|
|
short_name=user.get('shortName', ''),
|
|
hw_model=user.get('hwModel', 'UNKNOWN'),
|
|
)
|
|
|
|
node = self._nodes[num]
|
|
|
|
# Update from SDK data
|
|
if user:
|
|
node.long_name = user.get('longName', node.long_name) or node.long_name
|
|
node.short_name = user.get('shortName', node.short_name) or node.short_name
|
|
node.hw_model = user.get('hwModel', node.hw_model) or node.hw_model
|
|
if user.get('id'):
|
|
node.user_id = user.get('id')
|
|
|
|
if position:
|
|
lat = position.get('latitude')
|
|
lon = position.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
node.latitude = lat
|
|
node.longitude = lon
|
|
node.altitude = position.get('altitude', node.altitude)
|
|
|
|
# Update last heard from SDK
|
|
last_heard = node_data.get('lastHeard')
|
|
if last_heard:
|
|
node.last_heard = datetime.fromtimestamp(last_heard, tz=timezone.utc)
|
|
|
|
# Update SNR
|
|
node.snr = node_data.get('snr', node.snr)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing nodes from interface: {e}")
|
|
|
|
def get_channels(self) -> list[ChannelConfig]:
|
|
"""Get all configured channels."""
|
|
if not self._interface:
|
|
return []
|
|
|
|
channels = []
|
|
try:
|
|
for i, ch in enumerate(self._interface.localNode.channels):
|
|
if ch.role != 0: # 0 = DISABLED
|
|
channels.append(ChannelConfig(
|
|
index=i,
|
|
name=ch.settings.name or f"Channel {i}",
|
|
psk=bytes(ch.settings.psk) if ch.settings.psk else b'',
|
|
role=ch.role,
|
|
))
|
|
except Exception as e:
|
|
logger.error(f"Error getting channels: {e}")
|
|
return channels
|
|
|
|
def send_text(self, text: str, channel: int = 0,
|
|
destination: str | int | None = None) -> tuple[bool, str]:
|
|
"""
|
|
Send a text message to the mesh network.
|
|
|
|
Args:
|
|
text: Message text (max 237 characters)
|
|
channel: Channel index to send on (0-7)
|
|
destination: Target node ID (string like "!a1b2c3d4" or int).
|
|
None or "^all" for broadcast.
|
|
|
|
Returns:
|
|
Tuple of (success, error_message)
|
|
"""
|
|
if not self._interface:
|
|
return False, "Not connected to device"
|
|
|
|
if not text or len(text) > 237:
|
|
return False, "Message must be 1-237 characters"
|
|
|
|
try:
|
|
# Parse destination - use broadcast address for None/^all
|
|
dest_id = BROADCAST_ADDR # Default to broadcast
|
|
|
|
if destination:
|
|
if isinstance(destination, int):
|
|
dest_id = destination
|
|
elif destination == "^all":
|
|
dest_id = BROADCAST_ADDR
|
|
elif destination.startswith('!'):
|
|
dest_id = int(destination[1:], 16)
|
|
else:
|
|
# Try parsing as integer
|
|
try:
|
|
dest_id = int(destination)
|
|
except ValueError:
|
|
return False, f"Invalid destination: {destination}"
|
|
|
|
# Send the message using sendData for more control
|
|
logger.debug(f"Calling sendData: text='{text[:30]}', dest={dest_id}, channel={channel}")
|
|
|
|
# Use sendData with TEXT_MESSAGE_APP portnum
|
|
# This gives us more control over the packet
|
|
from meshtastic import portnums_pb2
|
|
|
|
self._interface.sendData(
|
|
text.encode('utf-8'),
|
|
destinationId=dest_id,
|
|
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
|
|
channelIndex=channel,
|
|
)
|
|
logger.debug("sendData completed")
|
|
|
|
dest_str = "^all" if dest_id == BROADCAST_ADDR else f"!{dest_id:08x}"
|
|
logger.info(f"Sent message to {dest_str} on channel {channel}: {text[:50]}...")
|
|
return True, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending message: {e}")
|
|
return False, str(e)
|
|
|
|
def set_channel(self, index: int, name: str | None = None,
|
|
psk: str | None = None) -> tuple[bool, str]:
|
|
"""
|
|
Configure a channel with encryption key.
|
|
|
|
Args:
|
|
index: Channel index (0-7)
|
|
name: Channel name (optional)
|
|
psk: Pre-shared key in one of these formats:
|
|
- "none" - disable encryption
|
|
- "default" - use default (public) key
|
|
- "random" - generate new AES-256 key
|
|
- "base64:..." - base64-encoded key (16 or 32 bytes)
|
|
- "0x..." - hex-encoded key (16 or 32 bytes)
|
|
- "simple:passphrase" - derive key from passphrase (AES-256)
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
if not self._interface:
|
|
return False, "Not connected to device"
|
|
|
|
if not 0 <= index <= 7:
|
|
return False, f"Invalid channel index: {index}. Must be 0-7."
|
|
|
|
try:
|
|
ch = self._interface.localNode.channels[index]
|
|
|
|
if name is not None:
|
|
ch.settings.name = name
|
|
|
|
if psk is not None:
|
|
psk_bytes = self._parse_psk(psk)
|
|
if psk_bytes is None:
|
|
return False, f"Invalid PSK format: {psk}"
|
|
ch.settings.psk = psk_bytes
|
|
|
|
# Enable channel if it was disabled
|
|
if ch.role == 0:
|
|
ch.role = 2 # SECONDARY (1 = PRIMARY, only one allowed)
|
|
|
|
# Write config to device
|
|
self._interface.localNode.writeChannel(index)
|
|
logger.info(f"Channel {index} configured: {name or ch.settings.name}")
|
|
return True, f"Channel {index} configured successfully"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error setting channel: {e}")
|
|
return False, str(e)
|
|
|
|
def _parse_psk(self, psk: str) -> bytes | None:
|
|
"""
|
|
Parse PSK string into bytes.
|
|
|
|
Supported formats:
|
|
- "none" - no encryption (empty key)
|
|
- "default" - use default public key (1 byte)
|
|
- "random" - generate random 32-byte AES-256 key
|
|
- "base64:..." - base64-encoded key
|
|
- "0x..." - hex-encoded key
|
|
- "simple:passphrase" - SHA-256 hash of passphrase
|
|
"""
|
|
psk = psk.strip()
|
|
|
|
if psk.lower() == 'none':
|
|
return b''
|
|
|
|
if psk.lower() == 'default':
|
|
# Default key (1 byte = use default)
|
|
return b'\x01'
|
|
|
|
if psk.lower() == 'random':
|
|
# Generate random 32-byte key
|
|
return secrets.token_bytes(32)
|
|
|
|
if psk.startswith('base64:'):
|
|
try:
|
|
decoded = base64.b64decode(psk[7:])
|
|
if len(decoded) not in (0, 1, 16, 32):
|
|
logger.warning(f"PSK length {len(decoded)} is non-standard")
|
|
return decoded
|
|
except Exception:
|
|
return None
|
|
|
|
if psk.startswith('0x'):
|
|
try:
|
|
decoded = bytes.fromhex(psk[2:])
|
|
if len(decoded) not in (0, 1, 16, 32):
|
|
logger.warning(f"PSK length {len(decoded)} is non-standard")
|
|
return decoded
|
|
except Exception:
|
|
return None
|
|
|
|
if psk.startswith('simple:'):
|
|
# Hash passphrase to create 32-byte AES-256 key
|
|
passphrase = psk[7:].encode('utf-8')
|
|
return hashlib.sha256(passphrase).digest()
|
|
|
|
# Try as raw base64 (for compatibility)
|
|
try:
|
|
decoded = base64.b64decode(psk)
|
|
if len(decoded) in (0, 1, 16, 32):
|
|
return decoded
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
def send_traceroute(self, destination: str | int, hop_limit: int = 7) -> tuple[bool, str]:
|
|
"""
|
|
Send a traceroute request to a destination node.
|
|
|
|
Args:
|
|
destination: Target node ID (string like "!a1b2c3d4" or int)
|
|
hop_limit: Maximum number of hops (1-7, default 7)
|
|
|
|
Returns:
|
|
Tuple of (success, error_message)
|
|
"""
|
|
if not self._interface:
|
|
return False, "Not connected to device"
|
|
|
|
if not HAS_MESHTASTIC:
|
|
return False, "Meshtastic SDK not installed"
|
|
|
|
# Validate hop limit
|
|
hop_limit = max(1, min(7, hop_limit))
|
|
|
|
try:
|
|
# Parse destination
|
|
if isinstance(destination, int):
|
|
dest_id = destination
|
|
elif destination.startswith('!'):
|
|
dest_id = int(destination[1:], 16)
|
|
else:
|
|
try:
|
|
dest_id = int(destination)
|
|
except ValueError:
|
|
return False, f"Invalid destination: {destination}"
|
|
|
|
if dest_id == BROADCAST_ADDR:
|
|
return False, "Cannot traceroute to broadcast address"
|
|
|
|
# Use the SDK's sendTraceRoute method
|
|
logger.info(f"Sending traceroute to {self._format_node_id(dest_id)} with hop_limit={hop_limit}")
|
|
self._interface.sendTraceRoute(dest_id, hopLimit=hop_limit)
|
|
|
|
return True, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending traceroute: {e}")
|
|
return False, str(e)
|
|
|
|
def _handle_traceroute_response(self, packet: dict, decoded: dict) -> None:
|
|
"""Handle incoming traceroute response."""
|
|
try:
|
|
from_num = packet.get('from', 0)
|
|
route_discovery = decoded.get('routeDiscovery', {})
|
|
|
|
# Extract route information
|
|
route = route_discovery.get('route', [])
|
|
route_back = route_discovery.get('routeBack', [])
|
|
snr_towards = route_discovery.get('snrTowards', [])
|
|
snr_back = route_discovery.get('snrBack', [])
|
|
|
|
# Convert node numbers to IDs
|
|
route_ids = [self._format_node_id(n) for n in route]
|
|
route_back_ids = [self._format_node_id(n) for n in route_back]
|
|
|
|
# Convert SNR values (stored as int8, need to convert)
|
|
snr_towards_float = [float(s) / 4.0 if isinstance(s, int) else float(s) for s in snr_towards]
|
|
snr_back_float = [float(s) / 4.0 if isinstance(s, int) else float(s) for s in snr_back]
|
|
|
|
result = TracerouteResult(
|
|
destination_id=self._format_node_id(from_num),
|
|
route=route_ids,
|
|
route_back=route_back_ids,
|
|
snr_towards=snr_towards_float,
|
|
snr_back=snr_back_float,
|
|
timestamp=datetime.now(timezone.utc),
|
|
success=len(route) > 0 or len(route_back) > 0,
|
|
)
|
|
|
|
# Store result
|
|
self._traceroute_results.append(result)
|
|
if len(self._traceroute_results) > self._max_traceroute_results:
|
|
self._traceroute_results.pop(0)
|
|
|
|
logger.info(f"Traceroute response from {result.destination_id}: route={route_ids}, route_back={route_back_ids}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling traceroute response: {e}")
|
|
|
|
def get_traceroute_results(self, limit: int | None = None) -> list[TracerouteResult]:
|
|
"""
|
|
Get recent traceroute results.
|
|
|
|
Args:
|
|
limit: Maximum number of results to return (None for all)
|
|
|
|
Returns:
|
|
List of TracerouteResult objects, most recent first
|
|
"""
|
|
results = list(reversed(self._traceroute_results))
|
|
if limit:
|
|
results = results[:limit]
|
|
return results
|
|
|
|
|
|
# Global client instance
|
|
_client: MeshtasticClient | None = None
|
|
|
|
|
|
def get_meshtastic_client() -> MeshtasticClient | None:
|
|
"""Get the global Meshtastic client instance."""
|
|
return _client
|
|
|
|
|
|
def start_meshtastic(device: str | None = None,
|
|
callback: Callable[[MeshtasticMessage], None] | None = None) -> bool:
|
|
"""
|
|
Start the Meshtastic client.
|
|
|
|
Args:
|
|
device: Serial port path (optional, auto-discovers if not provided)
|
|
callback: Function to call when messages are received
|
|
|
|
Returns:
|
|
True if started successfully
|
|
"""
|
|
global _client
|
|
|
|
if _client and _client.is_running:
|
|
return True
|
|
|
|
_client = MeshtasticClient()
|
|
if callback:
|
|
_client.set_callback(callback)
|
|
|
|
return _client.connect(device)
|
|
|
|
|
|
def stop_meshtastic() -> None:
|
|
"""Stop the Meshtastic client."""
|
|
global _client
|
|
if _client:
|
|
_client.disconnect()
|
|
_client = None
|
|
|
|
|
|
def is_meshtastic_available() -> bool:
|
|
"""Check if Meshtastic SDK is installed."""
|
|
return HAS_MESHTASTIC
|