feat: Add Meshtastic mesh network integration

Add support for connecting to Meshtastic LoRa mesh devices via USB/Serial.
Includes routes for device connection, channel configuration with encryption,
and SSE streaming of received messages.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-27 22:17:40 +00:00
parent 04b73596ea
commit 84b424b02e
3 changed files with 1313 additions and 0 deletions
+386
View File
@@ -0,0 +1,386 @@
"""Meshtastic mesh network routes.
Provides endpoints for connecting to Meshtastic devices, configuring
channels with encryption keys, and streaming received messages.
Requires a physical Meshtastic device (Heltec, T-Beam, RAK, etc.)
connected via USB/Serial.
"""
from __future__ import annotations
import queue
import time
from typing import Generator
from flask import Blueprint, jsonify, request, Response
from utils.logging import get_logger
from utils.sse import format_sse
from utils.meshtastic import (
get_meshtastic_client,
start_meshtastic,
stop_meshtastic,
is_meshtastic_available,
MeshtasticMessage,
)
logger = get_logger('intercept.meshtastic')
meshtastic_bp = Blueprint('meshtastic', __name__, url_prefix='/meshtastic')
# Queue for SSE message streaming
_mesh_queue: queue.Queue = queue.Queue(maxsize=500)
# Store recent messages for history
_recent_messages: list[dict] = []
MAX_HISTORY = 500
def _message_callback(msg: MeshtasticMessage) -> None:
"""Callback to queue messages for SSE stream."""
msg_dict = msg.to_dict()
# Add to history
_recent_messages.append(msg_dict)
if len(_recent_messages) > MAX_HISTORY:
_recent_messages.pop(0)
# Queue for SSE
try:
_mesh_queue.put_nowait(msg_dict)
except queue.Full:
try:
_mesh_queue.get_nowait()
_mesh_queue.put_nowait(msg_dict)
except queue.Empty:
pass
@meshtastic_bp.route('/status')
def get_status():
"""
Get Meshtastic connection status.
Returns:
JSON with connection status, device info, and node information.
"""
if not is_meshtastic_available():
return jsonify({
'available': False,
'running': False,
'error': 'Meshtastic SDK not installed. Install with: pip install meshtastic'
})
client = get_meshtastic_client()
if not client:
return jsonify({
'available': True,
'running': False,
'device': None,
'node_info': None,
})
node_info = client.get_node_info() if client.is_running else None
return jsonify({
'available': True,
'running': client.is_running,
'device': client.device_path,
'error': client.error,
'node_info': node_info.to_dict() if node_info else None,
})
@meshtastic_bp.route('/start', methods=['POST'])
def start_mesh():
"""
Start Meshtastic listener.
Connects to a Meshtastic device and begins receiving messages.
The device must be connected via USB/Serial.
JSON body (optional):
{
"device": "/dev/ttyUSB0" // Serial port path. Auto-discovers if not provided.
}
Returns:
JSON with connection status.
"""
if not is_meshtastic_available():
return jsonify({
'status': 'error',
'message': 'Meshtastic SDK not installed. Install with: pip install meshtastic'
}), 400
client = get_meshtastic_client()
if client and client.is_running:
return jsonify({
'status': 'already_running',
'device': client.device_path
})
# Clear queue and history
while not _mesh_queue.empty():
try:
_mesh_queue.get_nowait()
except queue.Empty:
break
_recent_messages.clear()
# Get optional device path
data = request.get_json(silent=True) or {}
device = data.get('device')
# Validate device path if provided
if device:
device = str(device).strip()
if not device:
device = None
# Start client
success = start_meshtastic(device=device, callback=_message_callback)
if success:
client = get_meshtastic_client()
node_info = client.get_node_info() if client else None
return jsonify({
'status': 'started',
'device': client.device_path if client else None,
'node_info': node_info.to_dict() if node_info else None,
})
else:
client = get_meshtastic_client()
return jsonify({
'status': 'error',
'message': client.error if client else 'Failed to connect to Meshtastic device'
}), 500
@meshtastic_bp.route('/stop', methods=['POST'])
def stop_mesh():
"""
Stop Meshtastic listener.
Disconnects from the Meshtastic device and stops receiving messages.
Returns:
JSON confirmation.
"""
stop_meshtastic()
return jsonify({'status': 'stopped'})
@meshtastic_bp.route('/channels')
def get_channels():
"""
Get configured channels on the connected device.
Returns:
JSON with list of channel configurations.
Note: PSK values are not returned for security - only encryption status.
"""
client = get_meshtastic_client()
if not client or not client.is_running:
return jsonify({
'status': 'error',
'message': 'Not connected to Meshtastic device'
}), 400
channels = client.get_channels()
return jsonify({
'status': 'ok',
'channels': [ch.to_dict() for ch in channels],
'count': len(channels)
})
@meshtastic_bp.route('/channels/<int:index>', methods=['POST'])
def configure_channel(index: int):
"""
Configure a channel with name and/or encryption key.
This allows joining encrypted channels by providing the PSK.
The configuration is written to the connected Meshtastic device.
Args:
index: Channel index (0-7). Channel 0 is typically the primary channel.
JSON body:
{
"name": "MyChannel", // Optional: Channel name
"psk": "base64:ABC123..." // Optional: Encryption key
}
PSK formats:
- "none" : Disable encryption
- "default" : Use default public key (NOT SECURE - known key)
- "random" : Generate new random AES-256 key
- "base64:..." : Base64-encoded 16-byte (AES-128) or 32-byte (AES-256) key
- "0x..." : Hex-encoded key
- "simple:passphrase" : Derive AES-256 key from passphrase using SHA-256
Returns:
JSON with configuration result.
Security note:
The "default" key is publicly known (shipped in source code).
Use "random" or provide your own key for secure communications.
"""
client = get_meshtastic_client()
if not client or not client.is_running:
return jsonify({
'status': 'error',
'message': 'Not connected to Meshtastic device'
}), 400
if not 0 <= index <= 7:
return jsonify({
'status': 'error',
'message': 'Channel index must be 0-7'
}), 400
data = request.get_json(silent=True) or {}
name = data.get('name')
psk = data.get('psk')
if not name and not psk:
return jsonify({
'status': 'error',
'message': 'Must provide name and/or psk'
}), 400
# Sanitize name if provided
if name:
name = str(name).strip()[:12] # Meshtastic channel names max 12 chars
# Validate PSK format if provided
if psk:
psk = str(psk).strip()
success, message = client.set_channel(index, name=name, psk=psk)
if success:
# Return updated channel info
channels = client.get_channels()
updated = next((ch for ch in channels if ch.index == index), None)
return jsonify({
'status': 'ok',
'message': message,
'channel': updated.to_dict() if updated else None
})
else:
return jsonify({
'status': 'error',
'message': message
}), 500
@meshtastic_bp.route('/messages')
def get_messages():
"""
Get recent message history.
Returns the most recent messages received since the listener was started.
Limited to the last 500 messages.
Query parameters:
limit: Maximum number of messages to return (default: all)
channel: Filter by channel index (optional)
Returns:
JSON with message list.
"""
limit = request.args.get('limit', type=int)
channel = request.args.get('channel', type=int)
messages = _recent_messages.copy()
# Filter by channel if specified
if channel is not None:
messages = [m for m in messages if m.get('channel') == channel]
# Apply limit
if limit and limit > 0:
messages = messages[-limit:]
return jsonify({
'status': 'ok',
'messages': messages,
'count': len(messages)
})
@meshtastic_bp.route('/stream')
def stream_messages():
"""
SSE stream of Meshtastic messages.
Provides real-time Server-Sent Events stream of incoming messages.
Connect to this endpoint with EventSource to receive live updates.
Event format:
data: {"type": "meshtastic", "from": "!a1b2c3d4", "message": "Hello", ...}
Keepalive events are sent every 30 seconds to maintain the connection.
Returns:
SSE stream (text/event-stream)
"""
def generate() -> Generator[str, None, None]:
last_keepalive = time.time()
keepalive_interval = 30.0
while True:
try:
msg = _mesh_queue.get(timeout=1)
last_keepalive = time.time()
yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval:
yield format_sse({'type': 'keepalive'})
last_keepalive = now
response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
@meshtastic_bp.route('/node')
def get_node():
"""
Get local node information.
Returns information about the connected Meshtastic device including
its ID, name, hardware model, and current position (if available).
Returns:
JSON with node information.
"""
client = get_meshtastic_client()
if not client or not client.is_running:
return jsonify({
'status': 'error',
'message': 'Not connected to Meshtastic device'
}), 400
node_info = client.get_node_info()
if node_info:
return jsonify({
'status': 'ok',
'node': node_info.to_dict()
})
else:
return jsonify({
'status': 'error',
'message': 'Failed to get node information'
}), 500
+451
View File
@@ -0,0 +1,451 @@
"""Tests for Meshtastic integration.
Tests cover:
- MeshtasticClient initialization and state management
- PSK parsing (various formats)
- Message callback handling
- Route endpoints (mocked)
- Graceful degradation when SDK not installed
"""
import json
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime, timezone
# =============================================================================
# Utility Module Tests
# =============================================================================
class TestMeshtasticAvailability:
"""Tests for SDK availability checks."""
def test_is_meshtastic_available_returns_bool(self):
"""is_meshtastic_available should return a boolean."""
from utils.meshtastic import is_meshtastic_available
result = is_meshtastic_available()
assert isinstance(result, bool)
class TestMeshtasticMessage:
"""Tests for MeshtasticMessage dataclass."""
def test_message_to_dict(self):
"""MeshtasticMessage should convert to dictionary."""
from utils.meshtastic import MeshtasticMessage
msg = MeshtasticMessage(
from_id='!a1b2c3d4',
to_id='^all',
message='Hello mesh!',
portnum='TEXT_MESSAGE_APP',
channel=0,
rssi=-95,
snr=-3.5,
hop_limit=3,
timestamp=datetime(2026, 1, 27, 12, 0, 0, tzinfo=timezone.utc),
)
d = msg.to_dict()
assert d['type'] == 'meshtastic'
assert d['from'] == '!a1b2c3d4'
assert d['to'] == '^all'
assert d['message'] == 'Hello mesh!'
assert d['portnum'] == 'TEXT_MESSAGE_APP'
assert d['channel'] == 0
assert d['rssi'] == -95
assert d['snr'] == -3.5
assert d['hop_limit'] == 3
assert '2026-01-27' in d['timestamp']
def test_message_with_none_values(self):
"""MeshtasticMessage should handle None values."""
from utils.meshtastic import MeshtasticMessage
msg = MeshtasticMessage(
from_id='!00000001',
to_id='!00000002',
message=None,
portnum='POSITION_APP',
channel=1,
rssi=None,
snr=None,
hop_limit=None,
timestamp=datetime.now(timezone.utc),
)
d = msg.to_dict()
assert d['message'] is None
assert d['rssi'] is None
assert d['snr'] is None
class TestChannelConfig:
"""Tests for ChannelConfig dataclass."""
def test_channel_to_dict_hides_psk(self):
"""ChannelConfig.to_dict should not expose raw PSK."""
from utils.meshtastic import ChannelConfig
config = ChannelConfig(
index=0,
name='Primary',
psk=b'\x01\x02\x03\x04' * 8, # 32-byte key
role=1, # PRIMARY
)
d = config.to_dict()
assert 'psk' not in d # Raw PSK should not be in dict
assert d['index'] == 0
assert d['name'] == 'Primary'
assert d['role'] == 'PRIMARY'
assert d['encrypted'] is True
assert d['key_type'] == 'AES-256'
def test_channel_default_key_detection(self):
"""ChannelConfig should detect default key."""
from utils.meshtastic import ChannelConfig
# Default key is single byte 0x01
config = ChannelConfig(index=0, name='Test', psk=b'\x01', role=1)
d = config.to_dict()
assert d['is_default_key'] is True
assert d['key_type'] == 'default'
def test_channel_aes128_detection(self):
"""ChannelConfig should detect AES-128 key."""
from utils.meshtastic import ChannelConfig
config = ChannelConfig(index=0, name='Test', psk=b'0' * 16, role=1)
d = config.to_dict()
assert d['key_type'] == 'AES-128'
assert d['encrypted'] is True
def test_channel_no_encryption(self):
"""ChannelConfig should detect no encryption."""
from utils.meshtastic import ChannelConfig
config = ChannelConfig(index=0, name='Test', psk=b'', role=1)
d = config.to_dict()
assert d['key_type'] == 'none'
assert d['encrypted'] is False
class TestPSKParsing:
"""Tests for PSK format parsing."""
def test_parse_psk_none(self):
"""Should parse 'none' as empty bytes."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
result = client._parse_psk('none')
assert result == b''
def test_parse_psk_default(self):
"""Should parse 'default' as single byte."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
result = client._parse_psk('default')
assert result == b'\x01'
def test_parse_psk_random(self):
"""Should generate 32 random bytes for 'random'."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
result = client._parse_psk('random')
assert len(result) == 32
# Verify it's actually random (two calls should differ)
result2 = client._parse_psk('random')
assert result != result2
def test_parse_psk_base64(self):
"""Should decode base64 PSK."""
from utils.meshtastic import MeshtasticClient
import base64
client = MeshtasticClient()
# 32-byte key encoded as base64
key = b'A' * 32
encoded = 'base64:' + base64.b64encode(key).decode()
result = client._parse_psk(encoded)
assert result == key
def test_parse_psk_hex(self):
"""Should decode hex PSK."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
# 16-byte key as hex
result = client._parse_psk('0x' + '41' * 16)
assert result == b'A' * 16
def test_parse_psk_simple_passphrase(self):
"""Should hash simple passphrase to 32-byte key."""
from utils.meshtastic import MeshtasticClient
import hashlib
client = MeshtasticClient()
result = client._parse_psk('simple:MySecretPassword')
expected = hashlib.sha256(b'MySecretPassword').digest()
assert result == expected
assert len(result) == 32
def test_parse_psk_invalid(self):
"""Should return None for invalid PSK format."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
assert client._parse_psk('base64:!!!invalid!!!') is None
assert client._parse_psk('0xZZZZ') is None
def test_parse_psk_raw_base64(self):
"""Should accept raw base64 without prefix."""
from utils.meshtastic import MeshtasticClient
import base64
client = MeshtasticClient()
key = b'B' * 16
encoded = base64.b64encode(key).decode()
result = client._parse_psk(encoded)
assert result == key
class TestNodeIdFormatting:
"""Tests for node ID formatting."""
def test_format_regular_node(self):
"""Should format regular node as hex."""
from utils.meshtastic import MeshtasticClient
result = MeshtasticClient._format_node_id(0xDEADBEEF)
assert result == '!deadbeef'
def test_format_broadcast(self):
"""Should format broadcast address."""
from utils.meshtastic import MeshtasticClient
result = MeshtasticClient._format_node_id(0xFFFFFFFF)
assert result == '^all'
# =============================================================================
# Route Tests (Mocked)
# =============================================================================
class TestMeshtasticRoutes:
"""Tests for Flask route endpoints."""
@pytest.fixture
def app(self):
"""Create Flask test app."""
from flask import Flask
from routes.meshtastic import meshtastic_bp
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(meshtastic_bp)
return app
@pytest.fixture
def client(self, app):
"""Create test client."""
return app.test_client()
def test_status_sdk_not_installed(self, client):
"""GET /meshtastic/status should report SDK unavailable."""
with patch('routes.meshtastic.is_meshtastic_available', return_value=False):
response = client.get('/meshtastic/status')
data = json.loads(response.data)
assert response.status_code == 200
assert data['available'] is False
assert 'not installed' in data['error']
def test_status_not_connected(self, client):
"""GET /meshtastic/status should report not running when disconnected."""
with patch('routes.meshtastic.is_meshtastic_available', return_value=True):
with patch('routes.meshtastic.get_meshtastic_client', return_value=None):
response = client.get('/meshtastic/status')
data = json.loads(response.data)
assert response.status_code == 200
assert data['available'] is True
assert data['running'] is False
def test_start_sdk_not_installed(self, client):
"""POST /meshtastic/start should fail if SDK not installed."""
with patch('routes.meshtastic.is_meshtastic_available', return_value=False):
response = client.post('/meshtastic/start')
data = json.loads(response.data)
assert response.status_code == 400
assert data['status'] == 'error'
def test_stop_always_succeeds(self, client):
"""POST /meshtastic/stop should always succeed."""
with patch('routes.meshtastic.stop_meshtastic'):
response = client.post('/meshtastic/stop')
data = json.loads(response.data)
assert response.status_code == 200
assert data['status'] == 'stopped'
def test_channels_not_connected(self, client):
"""GET /meshtastic/channels should fail if not connected."""
with patch('routes.meshtastic.get_meshtastic_client', return_value=None):
response = client.get('/meshtastic/channels')
data = json.loads(response.data)
assert response.status_code == 400
assert 'Not connected' in data['message']
def test_configure_channel_invalid_index(self, client):
"""POST /meshtastic/channels/<id> should reject invalid index."""
mock_client = Mock()
mock_client.is_running = True
with patch('routes.meshtastic.get_meshtastic_client', return_value=mock_client):
response = client.post(
'/meshtastic/channels/10',
json={'name': 'Test'},
content_type='application/json'
)
data = json.loads(response.data)
assert response.status_code == 400
assert 'must be 0-7' in data['message']
def test_configure_channel_no_params(self, client):
"""POST /meshtastic/channels/<id> should require name or psk."""
mock_client = Mock()
mock_client.is_running = True
with patch('routes.meshtastic.get_meshtastic_client', return_value=mock_client):
response = client.post(
'/meshtastic/channels/0',
json={},
content_type='application/json'
)
data = json.loads(response.data)
assert response.status_code == 400
assert 'Must provide' in data['message']
def test_messages_empty(self, client):
"""GET /meshtastic/messages should return empty list initially."""
with patch('routes.meshtastic._recent_messages', []):
response = client.get('/meshtastic/messages')
data = json.loads(response.data)
assert response.status_code == 200
assert data['status'] == 'ok'
assert data['messages'] == []
assert data['count'] == 0
def test_messages_with_limit(self, client):
"""GET /meshtastic/messages should respect limit param."""
test_messages = [{'id': i} for i in range(10)]
with patch('routes.meshtastic._recent_messages', test_messages):
response = client.get('/meshtastic/messages?limit=3')
data = json.loads(response.data)
assert response.status_code == 200
assert len(data['messages']) == 3
# Should return last 3 (most recent)
assert data['messages'][0]['id'] == 7
def test_messages_filter_by_channel(self, client):
"""GET /meshtastic/messages should filter by channel."""
test_messages = [
{'id': 1, 'channel': 0},
{'id': 2, 'channel': 1},
{'id': 3, 'channel': 0},
]
with patch('routes.meshtastic._recent_messages', test_messages):
response = client.get('/meshtastic/messages?channel=0')
data = json.loads(response.data)
assert response.status_code == 200
assert len(data['messages']) == 2
assert all(m['channel'] == 0 for m in data['messages'])
def test_stream_endpoint_exists(self, client):
"""GET /meshtastic/stream should return SSE content type."""
response = client.get('/meshtastic/stream')
assert response.content_type == 'text/event-stream'
def test_node_not_connected(self, client):
"""GET /meshtastic/node should fail if not connected."""
with patch('routes.meshtastic.get_meshtastic_client', return_value=None):
response = client.get('/meshtastic/node')
data = json.loads(response.data)
assert response.status_code == 400
assert 'Not connected' in data['message']
# =============================================================================
# Integration Tests (Mocked SDK)
# =============================================================================
class TestMeshtasticClientMocked:
"""Tests for MeshtasticClient with mocked SDK."""
def test_client_init(self):
"""MeshtasticClient should initialize with default state."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
assert client.is_running is False
assert client.device_path is None
assert client.error is None
def test_client_connect_no_sdk(self):
"""MeshtasticClient.connect should fail gracefully without SDK."""
from utils.meshtastic import MeshtasticClient
with patch('utils.meshtastic.HAS_MESHTASTIC', False):
client = MeshtasticClient()
result = client.connect()
assert result is False
assert 'not installed' in client.error
def test_client_disconnect_idempotent(self):
"""MeshtasticClient.disconnect should be safe to call multiple times."""
from utils.meshtastic import MeshtasticClient
client = MeshtasticClient()
# Should not raise even when not connected
client.disconnect()
client.disconnect()
assert client.is_running is False
+476
View File
@@ -0,0 +1,476 @@
"""Meshtastic device management and message handling.
This module provides integration with Meshtastic mesh networking devices,
allowing INTERCEPT to receive and decode messages from LoRa mesh networks.
Requires a physical Meshtastic device connected via USB/Serial.
Install SDK with: pip install meshtastic
"""
from __future__ import annotations
import base64
import hashlib
import secrets
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable
from utils.logging import get_logger
logger = get_logger('intercept.meshtastic')
# Meshtastic SDK import (optional dependency)
try:
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
HAS_MESHTASTIC = True
except ImportError:
HAS_MESHTASTIC = False
logger.warning("Meshtastic SDK not installed. Install with: pip install meshtastic")
@dataclass
class MeshtasticMessage:
"""Decoded Meshtastic message."""
from_id: str
to_id: str
message: str | None
portnum: str
channel: int
rssi: int | None
snr: float | None
hop_limit: int | None
timestamp: datetime
raw_packet: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
'type': 'meshtastic',
'from': self.from_id,
'to': self.to_id,
'message': self.message,
'portnum': self.portnum,
'channel': self.channel,
'rssi': self.rssi,
'snr': self.snr,
'hop_limit': self.hop_limit,
'timestamp': self.timestamp.isoformat(),
}
@dataclass
class ChannelConfig:
"""Meshtastic channel configuration."""
index: int
name: str
psk: bytes
role: int # 0=DISABLED, 1=PRIMARY, 2=SECONDARY
def to_dict(self) -> dict:
"""Convert to dict for API response (hides raw PSK)."""
role_names = ['DISABLED', 'PRIMARY', 'SECONDARY']
# Default key is 1 byte (0x01) or the well-known AQ== base64
is_default = self.psk in (b'\x01', b'')
return {
'index': self.index,
'name': self.name,
'role': role_names[self.role] if self.role < len(role_names) else 'UNKNOWN',
'encrypted': len(self.psk) > 1,
'key_type': self._get_key_type(),
'is_default_key': is_default,
}
def _get_key_type(self) -> str:
"""Determine encryption type from key length."""
if len(self.psk) == 0:
return 'none'
elif len(self.psk) == 1:
return 'default'
elif len(self.psk) == 16:
return 'AES-128'
elif len(self.psk) == 32:
return 'AES-256'
else:
return 'unknown'
@dataclass
class NodeInfo:
"""Meshtastic node information."""
num: int
user_id: str
long_name: str
short_name: str
hw_model: str
latitude: float | None
longitude: float | None
altitude: int | None
def to_dict(self) -> dict:
return {
'num': self.num,
'user_id': self.user_id,
'long_name': self.long_name,
'short_name': self.short_name,
'hw_model': self.hw_model,
'position': {
'latitude': self.latitude,
'longitude': self.longitude,
'altitude': self.altitude,
} if self.latitude is not None else None,
}
class MeshtasticClient:
"""Client for connecting to Meshtastic devices."""
def __init__(self):
self._interface = None
self._running = False
self._callback: Callable[[MeshtasticMessage], None] | None = None
self._lock = threading.Lock()
self._device_path: str | None = None
self._error: str | None = None
@property
def is_running(self) -> bool:
return self._running
@property
def device_path(self) -> str | None:
return self._device_path
@property
def error(self) -> str | None:
return self._error
def set_callback(self, callback: Callable[[MeshtasticMessage], None]) -> None:
"""Set callback for received messages."""
self._callback = callback
def connect(self, device: str | None = None) -> bool:
"""
Connect to a Meshtastic device.
Args:
device: Serial port path (e.g., /dev/ttyUSB0, /dev/ttyACM0).
If None, auto-discovers first available device.
Returns:
True if connected successfully.
"""
if not HAS_MESHTASTIC:
self._error = "Meshtastic SDK not installed. Install with: pip install meshtastic"
return False
with self._lock:
if self._running:
return True
try:
# Subscribe to message events before connecting
pub.subscribe(self._on_receive, "meshtastic.receive")
pub.subscribe(self._on_connection, "meshtastic.connection.established")
pub.subscribe(self._on_disconnect, "meshtastic.connection.lost")
# Connect to device
if device:
self._interface = meshtastic.serial_interface.SerialInterface(device)
self._device_path = device
else:
# Auto-discover
self._interface = meshtastic.serial_interface.SerialInterface()
self._device_path = "auto"
self._running = True
self._error = None
logger.info(f"Connected to Meshtastic device: {self._device_path}")
return True
except Exception as e:
self._error = str(e)
logger.error(f"Failed to connect to Meshtastic: {e}")
self._cleanup_subscriptions()
return False
def disconnect(self) -> None:
"""Disconnect from the Meshtastic device."""
with self._lock:
if self._interface:
try:
self._interface.close()
except Exception as e:
logger.warning(f"Error closing Meshtastic interface: {e}")
self._interface = None
self._cleanup_subscriptions()
self._running = False
self._device_path = None
logger.info("Disconnected from Meshtastic device")
def _cleanup_subscriptions(self) -> None:
"""Unsubscribe from pubsub topics."""
if HAS_MESHTASTIC:
try:
pub.unsubscribe(self._on_receive, "meshtastic.receive")
except Exception:
pass
try:
pub.unsubscribe(self._on_connection, "meshtastic.connection.established")
except Exception:
pass
try:
pub.unsubscribe(self._on_disconnect, "meshtastic.connection.lost")
except Exception:
pass
def _on_connection(self, interface, topic=None) -> None:
"""Handle connection established event."""
logger.info("Meshtastic connection established")
def _on_disconnect(self, interface, topic=None) -> None:
"""Handle connection lost event."""
logger.warning("Meshtastic connection lost")
self._running = False
def _on_receive(self, packet: dict, interface) -> None:
"""Handle received packet from Meshtastic device."""
if not self._callback:
return
try:
decoded = packet.get('decoded', {})
# Extract text message if present
message = None
portnum = decoded.get('portnum', 'UNKNOWN')
if portnum == 'TEXT_MESSAGE_APP':
message = decoded.get('text')
elif 'payload' in decoded:
# For other message types, include payload info
message = f"[{portnum}]"
msg = MeshtasticMessage(
from_id=self._format_node_id(packet.get('from', 0)),
to_id=self._format_node_id(packet.get('to', 0)),
message=message,
portnum=portnum,
channel=packet.get('channel', 0),
rssi=packet.get('rxRssi'),
snr=packet.get('rxSnr'),
hop_limit=packet.get('hopLimit'),
timestamp=datetime.now(timezone.utc),
raw_packet=packet,
)
self._callback(msg)
logger.debug(f"Received: {msg.from_id} -> {msg.to_id}: {msg.portnum}")
except Exception as e:
logger.error(f"Error processing Meshtastic packet: {e}")
@staticmethod
def _format_node_id(node_num: int) -> str:
"""Format node number as hex string."""
if node_num == 0xFFFFFFFF:
return "^all"
return f"!{node_num:08x}"
def get_node_info(self) -> NodeInfo | None:
"""Get local node information."""
if not self._interface:
return None
try:
node = self._interface.getMyNodeInfo()
user = node.get('user', {})
position = node.get('position', {})
return NodeInfo(
num=node.get('num', 0),
user_id=user.get('id', ''),
long_name=user.get('longName', ''),
short_name=user.get('shortName', ''),
hw_model=user.get('hwModel', 'UNKNOWN'),
latitude=position.get('latitude'),
longitude=position.get('longitude'),
altitude=position.get('altitude'),
)
except Exception as e:
logger.error(f"Error getting node info: {e}")
return None
def get_channels(self) -> list[ChannelConfig]:
"""Get all configured channels."""
if not self._interface:
return []
channels = []
try:
for i, ch in enumerate(self._interface.localNode.channels):
if ch.role != 0: # 0 = DISABLED
channels.append(ChannelConfig(
index=i,
name=ch.settings.name or f"Channel {i}",
psk=bytes(ch.settings.psk) if ch.settings.psk else b'',
role=ch.role,
))
except Exception as e:
logger.error(f"Error getting channels: {e}")
return channels
def set_channel(self, index: int, name: str | None = None,
psk: str | None = None) -> tuple[bool, str]:
"""
Configure a channel with encryption key.
Args:
index: Channel index (0-7)
name: Channel name (optional)
psk: Pre-shared key in one of these formats:
- "none" - disable encryption
- "default" - use default (public) key
- "random" - generate new AES-256 key
- "base64:..." - base64-encoded key (16 or 32 bytes)
- "0x..." - hex-encoded key (16 or 32 bytes)
- "simple:passphrase" - derive key from passphrase (AES-256)
Returns:
Tuple of (success, message)
"""
if not self._interface:
return False, "Not connected to device"
if not 0 <= index <= 7:
return False, f"Invalid channel index: {index}. Must be 0-7."
try:
ch = self._interface.localNode.channels[index]
if name is not None:
ch.settings.name = name
if psk is not None:
psk_bytes = self._parse_psk(psk)
if psk_bytes is None:
return False, f"Invalid PSK format: {psk}"
ch.settings.psk = psk_bytes
# Enable channel if it was disabled
if ch.role == 0:
ch.role = 2 # SECONDARY (1 = PRIMARY, only one allowed)
# Write config to device
self._interface.localNode.writeChannel(index)
logger.info(f"Channel {index} configured: {name or ch.settings.name}")
return True, f"Channel {index} configured successfully"
except Exception as e:
logger.error(f"Error setting channel: {e}")
return False, str(e)
def _parse_psk(self, psk: str) -> bytes | None:
"""
Parse PSK string into bytes.
Supported formats:
- "none" - no encryption (empty key)
- "default" - use default public key (1 byte)
- "random" - generate random 32-byte AES-256 key
- "base64:..." - base64-encoded key
- "0x..." - hex-encoded key
- "simple:passphrase" - SHA-256 hash of passphrase
"""
psk = psk.strip()
if psk.lower() == 'none':
return b''
if psk.lower() == 'default':
# Default key (1 byte = use default)
return b'\x01'
if psk.lower() == 'random':
# Generate random 32-byte key
return secrets.token_bytes(32)
if psk.startswith('base64:'):
try:
decoded = base64.b64decode(psk[7:])
if len(decoded) not in (0, 1, 16, 32):
logger.warning(f"PSK length {len(decoded)} is non-standard")
return decoded
except Exception:
return None
if psk.startswith('0x'):
try:
decoded = bytes.fromhex(psk[2:])
if len(decoded) not in (0, 1, 16, 32):
logger.warning(f"PSK length {len(decoded)} is non-standard")
return decoded
except Exception:
return None
if psk.startswith('simple:'):
# Hash passphrase to create 32-byte AES-256 key
passphrase = psk[7:].encode('utf-8')
return hashlib.sha256(passphrase).digest()
# Try as raw base64 (for compatibility)
try:
decoded = base64.b64decode(psk)
if len(decoded) in (0, 1, 16, 32):
return decoded
except Exception:
pass
return None
# Global client instance
_client: MeshtasticClient | None = None
def get_meshtastic_client() -> MeshtasticClient | None:
"""Get the global Meshtastic client instance."""
return _client
def start_meshtastic(device: str | None = None,
callback: Callable[[MeshtasticMessage], None] | None = None) -> bool:
"""
Start the Meshtastic client.
Args:
device: Serial port path (optional, auto-discovers if not provided)
callback: Function to call when messages are received
Returns:
True if started successfully
"""
global _client
if _client and _client.is_running:
return True
_client = MeshtasticClient()
if callback:
_client.set_callback(callback)
return _client.connect(device)
def stop_meshtastic() -> None:
"""Stop the Meshtastic client."""
global _client
if _client:
_client.disconnect()
_client = None
def is_meshtastic_available() -> bool:
"""Check if Meshtastic SDK is installed."""
return HAS_MESHTASTIC