mirror of
https://github.com/kc1awv/rrcd.git
synced 2026-06-08 22:21:53 -07:00
274 lines
8.8 KiB
Python
274 lines
8.8 KiB
Python
from __future__ import annotations
|
|
|
|
import threading
|
|
from dataclasses import dataclass
|
|
|
|
import RNS
|
|
|
|
from rrcd.codec import decode
|
|
from rrcd.constants import K_BODY, K_NICK, K_T, T_JOIN, T_JOINED, T_PART, T_PARTED
|
|
from rrcd.envelope import make_envelope
|
|
from rrcd.router import MessageRouter
|
|
from rrcd.session import SessionManager
|
|
|
|
|
|
class _FakeStats:
|
|
def __init__(self) -> None:
|
|
self.counters: dict[str, int] = {}
|
|
|
|
def inc(self, key: str, delta: int = 1) -> None:
|
|
self.counters[key] = self.counters.get(key, 0) + delta
|
|
|
|
|
|
class _FakeMessageHelper:
|
|
def __init__(self) -> None:
|
|
self.errors: list[tuple[object, str, str | None]] = []
|
|
self.notices: list[tuple[object, str | None, str]] = []
|
|
|
|
def emit_error(
|
|
self, outgoing, link, *, src: bytes, text: str, room: str | None = None
|
|
) -> None:
|
|
self.errors.append((link, text, room))
|
|
|
|
def emit_notice(self, outgoing, link, room: str | None, text: str) -> None:
|
|
self.notices.append((link, room, text))
|
|
|
|
def queue_payload(self, outgoing, link, payload: bytes) -> None:
|
|
outgoing.append((link, payload))
|
|
|
|
def queue_env(self, outgoing, link, env: dict) -> None:
|
|
from rrcd.codec import encode
|
|
|
|
outgoing.append((link, encode(env)))
|
|
|
|
|
|
class _FakeRoomManager:
|
|
def __init__(self, members_by_room: dict[str, list[object]]) -> None:
|
|
self._members_by_room = {
|
|
room: list(members) for room, members in members_by_room.items()
|
|
}
|
|
self._room_registry: set[str] = set(members_by_room)
|
|
self._room_state: dict[str, dict] = {room: {} for room in members_by_room}
|
|
|
|
def get_room_members(self, room: str) -> list[object]:
|
|
return list(self._members_by_room.get(room, []))
|
|
|
|
def _room_state_ensure(self, room: str, founder: bytes | None = None) -> dict:
|
|
return self._room_state.setdefault(room, {})
|
|
|
|
def _room_state_get(self, room: str) -> dict | None:
|
|
return self._room_state.get(room)
|
|
|
|
def add_member(self, room: str, link: object) -> None:
|
|
self._members_by_room.setdefault(room, []).append(link)
|
|
|
|
def remove_member(self, room: str, link: object) -> None:
|
|
members = self._members_by_room.get(room, [])
|
|
self._members_by_room[room] = [member for member in members if member != link]
|
|
|
|
def touch_room(self, room: str) -> None:
|
|
return
|
|
|
|
def persist_room_state(self, link: object, room: str) -> None:
|
|
return
|
|
|
|
def get_room_mode_string(self, room: str) -> str:
|
|
return "+"
|
|
|
|
def is_invited(self, room: str, peer_hash: bytes) -> bool:
|
|
return False
|
|
|
|
def is_room_op(self, room: str, peer_hash: bytes) -> bool:
|
|
return False
|
|
|
|
def is_room_banned(self, room: str, peer_hash: bytes) -> bool:
|
|
return False
|
|
|
|
def is_room_moderated(self, room: str) -> bool:
|
|
return False
|
|
|
|
def is_room_voiced(self, room: str, peer_hash: bytes) -> bool:
|
|
return True
|
|
|
|
|
|
class _FakeSessionManager:
|
|
def __init__(self, sessions: dict[object, dict]) -> None:
|
|
self.sessions = sessions
|
|
|
|
def update_nick_index(
|
|
self, link: object, old_nick: str | None, new_nick: str | None
|
|
) -> None:
|
|
return
|
|
|
|
|
|
@dataclass
|
|
class _FakeIdentity:
|
|
hash: bytes
|
|
|
|
|
|
@dataclass
|
|
class _FakeConfig:
|
|
include_joined_member_list: bool = True
|
|
max_nick_bytes: int = 32
|
|
max_msg_body_bytes: int = 350
|
|
max_rooms_per_session: int = 8
|
|
rate_limit_msgs_per_minute: int = 240
|
|
|
|
|
|
class _FakeLink:
|
|
MDU = 10000
|
|
|
|
|
|
class _FakeHub:
|
|
def __init__(self, room_manager: _FakeRoomManager, session_manager) -> None:
|
|
import logging
|
|
|
|
self.log = logging.getLogger("test")
|
|
self.identity = _FakeIdentity(hash=b"hub")
|
|
self.config = _FakeConfig()
|
|
self.stats_manager = _FakeStats()
|
|
self.message_helper = _FakeMessageHelper()
|
|
self.room_manager = room_manager
|
|
self.session_manager = session_manager
|
|
self._state_lock = threading.RLock()
|
|
|
|
def _norm_room(self, room: str) -> str:
|
|
return room.lower()
|
|
|
|
def _fmt_hash(self, value: bytes) -> str:
|
|
return value.hex()
|
|
|
|
def _fmt_link_id(self, link: object) -> str:
|
|
return "link"
|
|
|
|
|
|
def test_joined_fanout_includes_joiner_nick_for_existing_members() -> None:
|
|
joining_link = _FakeLink()
|
|
existing_link = object()
|
|
joining_sess = {"rooms": set(), "nick": "alice", "peer": b"peer"}
|
|
existing_sess = {"rooms": {"#general"}, "nick": "bob", "peer": b"other"}
|
|
room_manager = _FakeRoomManager({"#general": [existing_link]})
|
|
session_manager = _FakeSessionManager(
|
|
{joining_link: joining_sess, existing_link: existing_sess}
|
|
)
|
|
hub = _FakeHub(room_manager, session_manager)
|
|
router = MessageRouter(hub)
|
|
outgoing: list[tuple[object, bytes]] = []
|
|
|
|
router._handle_join(
|
|
joining_link,
|
|
joining_sess,
|
|
b"peer",
|
|
make_envelope(T_JOIN, src=b"peer", room="#general"),
|
|
outgoing,
|
|
)
|
|
|
|
payloads = {link: decode(payload) for link, payload in outgoing}
|
|
|
|
assert payloads[existing_link][K_T] == T_JOINED
|
|
assert payloads[existing_link][K_BODY] == [b"peer"]
|
|
assert payloads[existing_link][K_NICK] == "alice"
|
|
|
|
assert payloads[joining_link][K_T] == T_JOINED
|
|
assert payloads[joining_link][K_BODY] == [b"other", b"peer"]
|
|
assert K_NICK not in payloads[joining_link]
|
|
|
|
|
|
def test_joined_fanout_omits_nick_when_joiner_has_no_nick() -> None:
|
|
joining_link = _FakeLink()
|
|
existing_link = object()
|
|
joining_sess = {"rooms": set(), "nick": None, "peer": b"peer"}
|
|
existing_sess = {"rooms": {"#general"}, "nick": "bob", "peer": b"other"}
|
|
room_manager = _FakeRoomManager({"#general": [existing_link]})
|
|
session_manager = _FakeSessionManager(
|
|
{joining_link: joining_sess, existing_link: existing_sess}
|
|
)
|
|
hub = _FakeHub(room_manager, session_manager)
|
|
router = MessageRouter(hub)
|
|
outgoing: list[tuple[object, bytes]] = []
|
|
|
|
router._handle_join(
|
|
joining_link,
|
|
joining_sess,
|
|
b"peer",
|
|
make_envelope(T_JOIN, src=b"peer", room="#general"),
|
|
outgoing,
|
|
)
|
|
|
|
payloads = {link: decode(payload) for link, payload in outgoing}
|
|
|
|
assert payloads[existing_link][K_T] == T_JOINED
|
|
assert payloads[existing_link][K_BODY] == [b"peer"]
|
|
assert K_NICK not in payloads[existing_link]
|
|
|
|
|
|
def test_parted_fanout_includes_departing_nick_for_existing_members() -> None:
|
|
parting_link = _FakeLink()
|
|
remaining_link = object()
|
|
parting_sess = {"rooms": {"#general"}, "nick": "alice", "peer": b"peer"}
|
|
remaining_sess = {"rooms": {"#general"}, "nick": "bob", "peer": b"other"}
|
|
room_manager = _FakeRoomManager({"#general": [parting_link, remaining_link]})
|
|
session_manager = _FakeSessionManager(
|
|
{parting_link: parting_sess, remaining_link: remaining_sess}
|
|
)
|
|
hub = _FakeHub(room_manager, session_manager)
|
|
router = MessageRouter(hub)
|
|
outgoing: list[tuple[object, bytes]] = []
|
|
|
|
router._handle_part(
|
|
parting_link,
|
|
parting_sess,
|
|
b"peer",
|
|
make_envelope(T_PART, src=b"peer", room="#general"),
|
|
outgoing,
|
|
)
|
|
|
|
payloads = {link: decode(payload) for link, payload in outgoing}
|
|
|
|
assert payloads[remaining_link][K_T] == T_PARTED
|
|
assert payloads[remaining_link][K_BODY] == [b"peer"]
|
|
assert payloads[remaining_link][K_NICK] == "alice"
|
|
|
|
assert payloads[parting_link][K_T] == T_PARTED
|
|
assert payloads[parting_link][K_BODY] == [b"peer"]
|
|
assert K_NICK not in payloads[parting_link]
|
|
|
|
|
|
def test_disconnect_parted_fanout_includes_cached_nick(monkeypatch) -> None:
|
|
closing_link = _FakeLink()
|
|
remaining_link = object()
|
|
room_manager = _FakeRoomManager({"#general": [closing_link, remaining_link]})
|
|
hub = _FakeHub(room_manager, session_manager=None)
|
|
session_manager = SessionManager(hub)
|
|
hub.session_manager = session_manager
|
|
session_manager.sessions = {
|
|
closing_link: {"rooms": {"#general"}, "peer": b"peer", "nick": "alice"},
|
|
remaining_link: {"rooms": {"#general"}, "peer": b"other", "nick": "bob"},
|
|
}
|
|
session_manager._rate = {closing_link: object(), remaining_link: object()}
|
|
session_manager._index_by_hash = {b"peer": closing_link, b"other": remaining_link}
|
|
sent_packets: list[tuple[object, bytes]] = []
|
|
|
|
class _Packet:
|
|
def __init__(self, link: object, payload: bytes) -> None:
|
|
self.link = link
|
|
self.payload = payload
|
|
|
|
def send(self) -> None:
|
|
sent_packets.append((self.link, self.payload))
|
|
|
|
monkeypatch.setattr(RNS, "Packet", _Packet)
|
|
|
|
peer_hash, nick, rooms_count = session_manager.on_link_closed(closing_link)
|
|
|
|
assert peer_hash == b"peer"
|
|
assert nick == "alice"
|
|
assert rooms_count == 1
|
|
assert len(sent_packets) == 1
|
|
assert sent_packets[0][0] is remaining_link
|
|
|
|
decoded = decode(sent_packets[0][1])
|
|
assert decoded[K_T] == T_PARTED
|
|
assert decoded[K_BODY] == [b"peer"]
|
|
assert decoded[K_NICK] == "alice"
|