fix(meshcore): fix thread safety in _set_state/connect, add missing tests

- Lock-protect `get_state` and `_set_state` to prevent data race
  between Flask and asyncio daemon threads
- Atomically check-and-set CONNECTING guard in `connect()` to close
  TOCTOU window between concurrent Flask threads
- Push status events outside the lock in both `_set_state` and
  `connect()` to avoid potential deadlock
- Add TestMeshcoreContact, TestMeshcoreClientStateMachine tests
  covering to_dict keys, queue push on state change, message append
  and 500-item cap (9 -> 13 tests)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-05-11 11:12:11 +01:00
parent 6807ee6878
commit 80bbdb2c09
2 changed files with 92 additions and 8 deletions
+71
View File
@@ -98,3 +98,74 @@ class TestConnectionState:
assert ConnectionState.CONNECTING.value == "connecting"
assert ConnectionState.CONNECTED.value == "connected"
assert ConnectionState.ERROR.value == "error"
class TestMeshcoreContact:
def test_to_dict_keys(self):
from utils.meshcore import MeshcoreContact
c = MeshcoreContact(
node_id="ab" * 32,
name="Alice",
public_key="ab" * 32,
last_msg=None,
)
d = c.to_dict()
assert d["node_id"] == "ab" * 32
assert d["name"] == "Alice"
assert d["last_msg"] is None
class TestMeshcoreClientStateMachine:
def test_status_event_pushed_on_connect_state_change(self):
from utils.meshcore import ConnectionState, MeshcoreClient
client = MeshcoreClient()
# Drain any queued events from __init__ (none expected, but be safe)
while not client.get_queue().empty():
client.get_queue().get_nowait()
# Call on_connected directly (simulating what AsyncWorker would call)
client.on_connected(transport="serial", device="/dev/ttyUSB0")
assert client.get_state() == ConnectionState.CONNECTED
event = client.get_queue().get_nowait()
assert event["type"] == "status"
assert event["data"]["state"] == "connected"
def test_on_message_appends_and_pushes_to_queue(self):
from utils.meshcore import MeshcoreClient, MeshcoreMessage
client = MeshcoreClient()
msg = MeshcoreMessage(
id="m1",
sender_id="A",
recipient_id="BROADCAST",
text="hi",
timestamp=datetime.now(timezone.utc),
hop_count=0,
snr=None,
is_direct=False,
)
client.on_message(msg)
assert len(client.get_messages()) == 1
event = client.get_queue().get_nowait()
assert event["type"] == "message"
assert event["data"]["text"] == "hi"
def test_on_message_caps_at_500(self):
from utils.meshcore import MeshcoreClient, MeshcoreMessage
client = MeshcoreClient()
for i in range(510):
client.on_message(
MeshcoreMessage(
id=str(i),
sender_id="X",
recipient_id="BROADCAST",
text=f"msg{i}",
timestamp=datetime.now(timezone.utc),
hop_count=0,
snr=None,
is_direct=False,
)
)
assert len(client.get_messages()) == 500
+21 -8
View File
@@ -263,10 +263,13 @@ class MeshcoreClient:
def get_state(self) -> ConnectionState:
"""Return the current connection state."""
return self._state
with self._lock:
return self._state
def _set_state(self, state: ConnectionState, **extra) -> None:
self._state = state
with self._lock:
self._state = state
# Push the status event OUTSIDE the lock (avoids deadlock; _push is queue-based)
payload: dict = {"state": state.value}
payload.update(extra)
self._push({"type": "status", "data": payload})
@@ -291,16 +294,26 @@ class MeshcoreClient:
def connect(self, config: ConnectionConfig) -> None:
"""Start background AsyncWorker with the given connection config."""
if self._state == ConnectionState.CONNECTING:
return
with self._lock:
if self._state == ConnectionState.CONNECTING:
return
self._state = ConnectionState.CONNECTING
# Push status event outside the lock
self._push({"type": "status", "data": {"state": ConnectionState.CONNECTING.value}})
if isinstance(config, BLEConfig) and _is_docker():
self._set_state(
ConnectionState.ERROR,
message="BLE unavailable in Docker. Run meshcore-proxy on the host and connect via TCP.",
with self._lock:
self._state = ConnectionState.ERROR
self._push(
{
"type": "status",
"data": {
"state": ConnectionState.ERROR.value,
"message": "BLE unavailable in Docker. Run meshcore-proxy on the host and connect via TCP.",
},
}
)
return
self._config = config
self._set_state(ConnectionState.CONNECTING)
from utils.meshcore_client import AsyncWorker # imported lazily (Task 3)
self._worker = AsyncWorker(config, self)