From 7e5675fdd7debdfcec6bb0a9f0229828f3352cb5 Mon Sep 17 00:00:00 2001 From: kc1awv Date: Wed, 7 Jan 2026 10:57:35 -0500 Subject: [PATCH] refactor room management --- pyproject.toml | 2 +- rrcd/commands.py | 166 +++++----- rrcd/resources.py | 10 +- rrcd/rooms.py | 702 ++++++++++++++++++++++++++++++++++++++++++ rrcd/router.py | 95 +++--- rrcd/service.py | 763 ++++++++-------------------------------------- rrcd/session.py | 7 +- 7 files changed, 971 insertions(+), 774 deletions(-) create mode 100644 rrcd/rooms.py diff --git a/pyproject.toml b/pyproject.toml index 809de57..6245938 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ version = {attr = "rrcd.__version__"} [tool.ruff] target-version = "py311" -line-length = 88 +line-length = 100 [tool.ruff.lint] select = ["E", "F", "I", "B", "UP"] diff --git a/rrcd/commands.py b/rrcd/commands.py index 6fb8d4c..a1917b5 100644 --- a/rrcd/commands.py +++ b/rrcd/commands.py @@ -79,14 +79,14 @@ class CommandHandler: # List all registered, non-private rooms with their topics with self.hub._state_lock: registered_rooms = [] - for room_name, st in self.hub._room_state.items(): + for room_name, st in self.hub.room_manager._room_state.items(): if st.get("registered") and not st.get("private"): topic = st.get("topic") registered_rooms.append((room_name, topic)) # Also check room registry for rooms not currently in room_state - for room_name, reg in self.hub._room_registry.items(): - if room_name not in self.hub._room_state: + for room_name, reg in self.hub.room_manager._room_registry.items(): + if room_name not in self.hub.room_manager._room_state: if not reg.get("private"): topic = reg.get("topic") registered_rooms.append((room_name, topic)) @@ -123,14 +123,14 @@ class CommandHandler: return True # Check if room is private - only server operators can see private rooms - st = self.hub._room_state_get(r) + st = self.hub.room_manager._room_state_get(r) if st and st.get("private"): if not self.hub._is_server_op(peer_hash): self._emit_notice(outgoing, link, None, f"room {r} is private") return True members = [] - for other in sorted(self.hub.rooms.get(r, set()), key=lambda x: id(x)): + for other in sorted(self.hub.room_manager.get_room_members(r), key=lambda x: id(x)): s = self.hub.session_manager.sessions.get(other) if not s: continue @@ -164,7 +164,7 @@ class CommandHandler: self._emit_notice(outgoing, link, room, f"bad room: {e}") return True - if not self.hub._is_room_op(r, peer_hash): + if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: self._emit_error( outgoing, @@ -190,10 +190,10 @@ class CommandHandler: return True tsess["rooms"].discard(r) - if r in self.hub.rooms: - self.hub.rooms[r].discard(target_link) - if not self.hub.rooms[r]: - self.hub.rooms.pop(r, None) + if self.hub.room_manager.get_room_members(r): + self.hub.room_manager.rooms[r].discard(target_link) + if not self.hub.room_manager.rooms[r]: + pass # room cleanup handled by room_manager if self.hub.identity is not None: self._emit_error( @@ -327,11 +327,11 @@ class CommandHandler: ) return True - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) # Clean up expired invites (best-effort). - if self.hub._prune_expired_invites(st) and bool(st.get("registered")): - self.hub._persist_room_state_to_registry(link, r) + if self.hub.room_manager.prune_expired_invites(r) and bool(st.get("registered")): + self.hub.room_manager.persist_room_state(link, r) founder = st.get("founder") if not ( isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash @@ -346,7 +346,7 @@ class CommandHandler: ) return True - if not self.hub._room_registry_path_for_writes(): + if not self.hub.room_manager.get_registry_path_for_writes(): self._emit_notice( outgoing, link, room, "cannot register room: no room_registry_path" ) @@ -357,10 +357,10 @@ class CommandHandler: st["topic_ops_only"] = True if isinstance(founder, (bytes, bytearray)): st.setdefault("ops", set()).add(bytes(founder)) - self.hub._touch_room(r) + self.hub.room_manager.touch_room(r) # Ensure registry mirrors registered rooms. - self.hub._room_registry[r] = { + self.hub.room_manager._room_registry[r] = { "founder": bytes(founder) if isinstance(founder, (bytes, bytearray)) else None, @@ -379,7 +379,7 @@ class CommandHandler: "last_used_ts": st.get("last_used_ts"), } - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"registered room {r}") return True @@ -403,7 +403,7 @@ class CommandHandler: ) return True - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) founder = st.get("founder") if not ( isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash @@ -423,11 +423,11 @@ class CommandHandler: return True st["registered"] = False - self.hub._room_registry.pop(r, None) + self.hub.room_manager._room_registry.pop(r, None) self.hub._delete_room_from_registry(link, r) # Drop state if empty. - if r not in self.hub.rooms or not self.hub.rooms.get(r): - self.hub._room_state.pop(r, None) + if not self.hub.room_manager.get_room_members(r) or not self.hub.room_manager.get_room_members(r): + self.hub.room_manager._room_state.pop(r, None) self._emit_notice(outgoing, link, room, f"unregistered room {r}") return True @@ -440,7 +440,7 @@ class CommandHandler: except Exception as e: self._emit_notice(outgoing, link, None, f"bad room: {e}") return True - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) if len(parts) == 2: topic = st.get("topic") self._emit_notice( @@ -451,8 +451,8 @@ class CommandHandler: ) return True - if not self.hub._is_room_op(r, peer_hash): - st = self.hub._room_state_ensure(r) + if not self.hub.room_manager.is_room_op(r, peer_hash): + st = self.hub.room_manager._room_state_ensure(r) if bool(st.get("topic_ops_only", False)): if self.hub.identity is not None: self._emit_error( @@ -466,10 +466,10 @@ class CommandHandler: topic = " ".join(parts[2:]).strip() st["topic"] = topic if topic else None - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) # Broadcast topic change to current members. - for other in list(self.hub.rooms.get(r, set())): + for other in list(self.hub.room_manager.get_room_members(r)): self._emit_notice( outgoing, other, @@ -489,7 +489,7 @@ class CommandHandler: except Exception as e: self._emit_notice(outgoing, link, None, f"bad room: {e}") return True - if not self.hub._is_room_op(r, peer_hash): + if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: self._emit_error( outgoing, @@ -507,7 +507,7 @@ class CommandHandler: ) return True - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) founder = st.get("founder") founder_b = ( bytes(founder) if isinstance(founder, (bytes, bytearray)) else None @@ -520,8 +520,8 @@ class CommandHandler: st["ops"] = ops if cmd == "op": ops.add(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"op granted in {r}") return True else: @@ -529,8 +529,8 @@ class CommandHandler: self._emit_notice(outgoing, link, room, "cannot deop founder") return True ops.discard(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"op removed in {r}") return True @@ -540,14 +540,14 @@ class CommandHandler: st["voiced"] = voiced if cmd == "voice": voiced.add(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"voice granted in {r}") return True else: voiced.discard(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"voice removed in {r}") return True @@ -565,7 +565,7 @@ class CommandHandler: except Exception as e: self._emit_notice(outgoing, link, None, f"bad room: {e}") return True - if not self.hub._is_room_op(r, peer_hash): + if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: self._emit_error( outgoing, @@ -576,41 +576,41 @@ class CommandHandler: ) return True flag = parts[2].strip().lower() - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) if flag in ("+m", "-m"): st["moderated"] = flag == "+m" - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - self.hub._broadcast_room_mode(r, outgoing) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + self.hub.room_manager.broadcast_room_mode(r, outgoing) return True if flag in ("+i", "-i"): st["invite_only"] = flag == "+i" - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - self.hub._broadcast_room_mode(r, outgoing) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + self.hub.room_manager.broadcast_room_mode(r, outgoing) return True if flag in ("+t", "-t"): st["topic_ops_only"] = flag == "+t" - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - self.hub._broadcast_room_mode(r, outgoing) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + self.hub.room_manager.broadcast_room_mode(r, outgoing) return True if flag in ("+n", "-n"): st["no_outside_msgs"] = flag == "+n" - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - self.hub._broadcast_room_mode(r, outgoing) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + self.hub.room_manager.broadcast_room_mode(r, outgoing) return True if flag in ("+p", "-p"): st["private"] = flag == "+p" - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - self.hub._broadcast_room_mode(r, outgoing) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + self.hub.room_manager.broadcast_room_mode(r, outgoing) return True if flag in ("+k", "-k"): @@ -627,9 +627,9 @@ class CommandHandler: st["key"] = key else: st["key"] = None - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - self.hub._broadcast_room_mode(r, outgoing) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + self.hub.room_manager.broadcast_room_mode(r, outgoing) return True if flag in ("+r", "-r"): @@ -676,9 +676,9 @@ class CommandHandler: return True ops.discard(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - for other in list(self.hub.rooms.get(r, set())): + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + for other in list(self.hub.room_manager.get_room_members(r)): self._emit_notice( outgoing, other, @@ -696,9 +696,9 @@ class CommandHandler: else: voiced.discard(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) - for other in list(self.hub.rooms.get(r, set())): + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) + for other in list(self.hub.room_manager.get_room_members(r)): self._emit_notice( outgoing, other, @@ -733,7 +733,7 @@ class CommandHandler: op = parts[2].strip().lower() if op == "list": - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) bans = st.get("bans") if not isinstance(bans, set) or not bans: self._emit_notice(outgoing, link, room, f"no bans in {r}") @@ -761,7 +761,7 @@ class CommandHandler: ) return True - if not self.hub._is_room_op(r, peer_hash): + if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: self._emit_error( outgoing, @@ -779,7 +779,7 @@ class CommandHandler: ) return True - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) bans = st.setdefault("bans", set()) if not isinstance(bans, set): bans = set() @@ -787,16 +787,16 @@ class CommandHandler: if op == "add": bans.add(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) # If currently present in room, remove them. - for other in list(self.hub.rooms.get(r, set())): + for other in list(self.hub.room_manager.get_room_members(r)): s = self.hub.session_manager.sessions.get(other) ph = s.get("peer") if s else None if isinstance(ph, (bytes, bytearray)) and bytes(ph) == target_hash: s.get("rooms", set()).discard(r) - self.hub.rooms.get(r, set()).discard(other) + self.hub.room_manager.get_room_members(r).discard(other) if self.hub.identity is not None: self._emit_error( outgoing, @@ -805,14 +805,14 @@ class CommandHandler: text=f"banned from {r}", room=r, ) - if r in self.hub.rooms and not self.hub.rooms[r]: - self.hub.rooms.pop(r, None) + if self.hub.room_manager.get_room_members(r) and not self.hub.room_manager.rooms[r]: + pass # room cleanup handled by room_manager self._emit_notice(outgoing, link, room, f"ban added in {r}") return True bans.discard(target_hash) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"ban removed in {r}") return True @@ -832,7 +832,7 @@ class CommandHandler: self._emit_notice(outgoing, link, None, f"bad room: {e}") return True - if not self.hub._is_room_op(r, peer_hash): + if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: self._emit_error( outgoing, @@ -844,7 +844,7 @@ class CommandHandler: return True op = parts[2].strip().lower() - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) invited = st.setdefault("invited", {}) if not isinstance(invited, dict): @@ -852,7 +852,7 @@ class CommandHandler: st["invited"] = invited # Drop expired entries before operating. - pruned = self.hub._prune_expired_invites(st) + pruned = self.hub.room_manager.prune_expired_invites(r) if op == "list": now = float(time.time()) @@ -869,8 +869,8 @@ class CommandHandler: items.append(f"{bytes(h).hex()} expires_in={int(exp_f - now)}s") items.sort() if pruned: - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice( outgoing, link, @@ -955,8 +955,8 @@ class CommandHandler: ttl = 900.0 exp = float(time.time()) + ttl invited[target_hash] = exp - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice( outgoing, link, @@ -978,8 +978,8 @@ class CommandHandler: if target_hash in invited: invited.pop(target_hash, None) - self.hub._touch_room(r) - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.touch_room(r) + self.hub.room_manager.persist_room_state(link, r) self._emit_notice(outgoing, link, room, f"invite removed in {r}") return True diff --git a/rrcd/resources.py b/rrcd/resources.py index 57bfd7e..ae9e082 100644 --- a/rrcd/resources.py +++ b/rrcd/resources.py @@ -416,7 +416,7 @@ class ResourceManager: with self.hub._state_lock: sess = self.hub.session_manager.sessions.get(link) peer_hash = sess.get("peer") if sess else None - room_members = self.hub.rooms.get(exp.room, set()) + room_members = self.hub.room_manager.get_room_members(exp.room) if peer_hash and room_members: notice_env = make_envelope( @@ -494,6 +494,10 @@ class ResourceManager: """ Send large payload via Resource. Returns True if successfully initiated, False otherwise. + + Note: This sends the resource envelope immediately, then creates + and advertises the resource. Should only be called when immediate + sending is desired (not when batching messages). """ if not self.hub.config.enable_resource_transfer: return False @@ -555,7 +559,9 @@ class ResourceManager: # Create and advertise resource try: - resource = RNS.Resource(payload, link, advertise=True, auto_compress=False) + resource = RNS.Resource( + payload, link, advertise=True, auto_compress=False + ) with self.hub._state_lock: self._active_resources.setdefault(link, set()).add(resource) diff --git a/rrcd/rooms.py b/rrcd/rooms.py new file mode 100644 index 0000000..bc91283 --- /dev/null +++ b/rrcd/rooms.py @@ -0,0 +1,702 @@ +"""Room management for RRCD hub. + +This module handles all room-related functionality including: +- Room membership tracking +- Room state (modes, topic, permissions) +- Room registry persistence to TOML +- Permission management (ops, voiced, bans) +- Invite tracking with expiration +""" + +from __future__ import annotations + +import logging +import os +import threading +import time +from typing import TYPE_CHECKING, Any + +import RNS + +if TYPE_CHECKING: + from .service import HubService + + +class RoomManager: + """Manages room memberships, state, permissions, and registry persistence.""" + + def __init__(self, hub: HubService) -> None: + self.hub = hub + self.log = logging.getLogger("rrcd.rooms") + + # Room memberships: room name -> set of links in that room + self.rooms: dict[str, set[RNS.Link]] = {} + + # Room state (hub-local conventions; no new on-wire message types). + # _room_state holds active in-memory state (and registered state for empty rooms). + # _room_registry holds registered rooms loaded from config. + self._room_state: dict[str, dict[str, Any]] = {} + self._room_registry: dict[str, dict[str, Any]] = {} + + self._room_registry_write_lock = threading.Lock() + + def clear_all(self) -> None: + """Clear all room state. Called during hub shutdown.""" + self.rooms.clear() + self._room_state.clear() + self._room_registry.clear() + + def get_room_members(self, room: str) -> set[RNS.Link]: + """Get set of links currently in a room.""" + return self.rooms.get(room, set()) + + def add_member(self, room: str, link: RNS.Link, *, founder: bytes | None = None) -> None: + """Add a link to a room, creating the room if needed.""" + if room not in self.rooms: + self.rooms[room] = set() + self._room_state_ensure(room, founder=founder) + + self.rooms.setdefault(room, set()).add(link) + + def remove_member(self, room: str, link: RNS.Link) -> None: + """Remove a link from a room, cleaning up empty rooms.""" + if room in self.rooms: + self.rooms[room].discard(link) + if not self.rooms[room]: + self.rooms.pop(room, None) + st = self._room_state_get(room) + # Clean up unregistered empty rooms + if st is not None and not st.get("registered"): + self._room_state.pop(room, None) + + def remove_member_from_all(self, link: RNS.Link) -> int: + """Remove a link from all rooms. Returns number of rooms left.""" + rooms_to_remove = [r for r, links in self.rooms.items() if link in links] + for room in rooms_to_remove: + self.remove_member(room, link) + return len(rooms_to_remove) + + def get_member_rooms(self, link: RNS.Link) -> list[str]: + """Get list of rooms a link is currently in.""" + return [room for room, links in self.rooms.items() if link in links] + + def get_stats(self) -> dict[str, Any]: + """Get room statistics for hub stats.""" + rooms_total = len(self.rooms) + memberships = sum(len(v) for v in self.rooms.values()) + top_rooms = sorted( + ((room, len(links)) for room, links in self.rooms.items()), + key=lambda x: (-x[1], x[0]), + )[:5] + return { + "rooms_total": rooms_total, + "memberships": memberships, + "top_rooms": top_rooms, + } + + # Room state management + + def _room_state_get(self, room: str) -> dict[str, Any] | None: + """Get room state dict if it exists.""" + return self._room_state.get(room) + + def _room_state_ensure( + self, room: str, *, founder: bytes | None = None + ) -> dict[str, Any]: + """Ensure room state exists, creating from registry or defaults.""" + st = self._room_state.get(room) + if st is not None: + if st.get("founder") is None and founder is not None: + st["founder"] = founder + st.setdefault("ops", set()).add(founder) + return st + + # Load from registry if registered + if room in self._room_registry: + base = self._room_registry[room] + invited = base.get("invited") + invited_dict: dict[bytes, float] = {} + if isinstance(invited, dict): + for k, v in invited.items(): + if isinstance(k, (bytes, bytearray)): + try: + invited_dict[bytes(k)] = float(v) + except Exception: + continue + st = { + "founder": base.get("founder"), + "registered": True, + "topic": base.get("topic"), + "moderated": bool(base.get("moderated", False)), + "invite_only": bool(base.get("invite_only", False)), + "topic_ops_only": bool(base.get("topic_ops_only", False)), + "no_outside_msgs": bool(base.get("no_outside_msgs", False)), + "private": bool(base.get("private", False)), + "key": base.get("key"), + "ops": set(base.get("ops", set())), + "voiced": set(base.get("voiced", set())), + "bans": set(base.get("bans", set())), + "invited": invited_dict, + "last_used_ts": base.get("last_used_ts"), + } + self._room_state[room] = st + return st + + # Create new unregistered room + st = { + "founder": founder, + "registered": False, + "topic": None, + "moderated": False, + "invite_only": False, + "topic_ops_only": False, + "no_outside_msgs": False, + "private": False, + "key": None, + "ops": set([founder]) if founder is not None else set(), + "voiced": set(), + "bans": set(), + "invited": {}, + "last_used_ts": None, + } + self._room_state[room] = st + return st + + def touch_room(self, room: str) -> None: + """Update last_used_ts for a room.""" + try: + st = self._room_state_ensure(room) + ts = float(time.time()) + st["last_used_ts"] = ts + reg = self._room_registry.get(room) + if isinstance(reg, dict): + reg["last_used_ts"] = ts + except Exception: + pass + + # Room modes and permissions + + def get_room_modes(self, room: str) -> dict[str, Any]: + """Get dict of room mode flags.""" + st = self._room_state_ensure(room) + registered = bool(st.get("registered", False)) + moderated = bool(st.get("moderated", False)) + invite_only = bool(st.get("invite_only", False)) + topic_ops_only = bool(st.get("topic_ops_only", False)) + no_outside_msgs = bool(st.get("no_outside_msgs", False)) + private = bool(st.get("private", False)) + key = st.get("key") + has_key = isinstance(key, str) and bool(key) + return { + "registered": registered, + "moderated": moderated, + "invite_only": invite_only, + "topic_ops_only": topic_ops_only, + "no_outside_msgs": no_outside_msgs, + "private": private, + "has_key": has_key, + } + + def get_room_mode_string(self, room: str) -> str: + """Get IRC-style mode string for a room.""" + m = self.get_room_modes(room) + flags: list[str] = [] + # Keep roughly IRC-ish order. + if m.get("invite_only"): + flags.append("i") + if m.get("has_key"): + flags.append("k") + if m.get("moderated"): + flags.append("m") + if m.get("no_outside_msgs"): + flags.append("n") + if m.get("private"): + flags.append("p") + if m.get("registered"): + flags.append("r") + if m.get("topic_ops_only"): + flags.append("t") + return "+" + "".join(flags) if flags else "(none)" + + def broadcast_room_mode( + self, room: str, outgoing: list[tuple[RNS.Link, bytes]] | None = None + ) -> None: + """Broadcast current room mode to all members.""" + mode_txt = self.get_room_mode_string(room) + recipients = list(self.get_room_members(room)) + for other in recipients: + self.hub._emit_notice( + outgoing, other, room, f"mode for {room} is now: {mode_txt}" + ) + + def is_room_moderated(self, room: str) -> bool: + """Check if room is moderated.""" + st = self._room_state_ensure(room) + return bool(st.get("moderated", False)) + + def is_room_op(self, room: str, peer_hash: bytes | None) -> bool: + """Check if peer is a room operator.""" + if peer_hash is None: + return False + if self.hub._is_server_op(peer_hash): + return True + st = self._room_state_ensure(room) + founder = st.get("founder") + if isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash: + return True + ops = st.get("ops") + return isinstance(ops, set) and peer_hash in ops + + def is_room_voiced(self, room: str, peer_hash: bytes | None) -> bool: + """Check if peer has voice in room.""" + if peer_hash is None: + return False + if self.is_room_op(room, peer_hash): + return True + st = self._room_state_ensure(room) + voiced = st.get("voiced") + return isinstance(voiced, set) and peer_hash in voiced + + def is_room_banned(self, room: str, peer_hash: bytes | None) -> bool: + """Check if peer is banned from room.""" + if peer_hash is None: + return False + st = self._room_state_ensure(room) + bans = st.get("bans") + return isinstance(bans, set) and peer_hash in bans + + # Invite management + + def is_invited(self, room: str, peer_hash: bytes) -> bool: + """Check if peer has a valid (non-expired) invite.""" + st = self._room_state_ensure(room) + inv = st.get("invited") + if not isinstance(inv, dict) or not inv: + return False + now = float(time.time()) + exp = inv.get(peer_hash) + try: + exp_f = float(exp) if exp is not None else 0.0 + except Exception: + exp_f = 0.0 + if exp_f <= now: + inv.pop(peer_hash, None) + return False + return True + + def prune_expired_invites(self, room: str) -> bool: + """Remove expired invites from a room. Returns True if any were removed.""" + st = self._room_state_ensure(room) + inv = st.get("invited") + if not isinstance(inv, dict) or not inv: + return False + now = float(time.time()) + removed_any = False + for h, exp in list(inv.items()): + try: + exp_f = float(exp) + except Exception: + exp_f = 0.0 + if exp_f <= now: + inv.pop(h, None) + removed_any = True + return removed_any + + # Room registry persistence + + def load_registry_from_path( + self, path: str, *, invite_timeout_s: float + ) -> tuple[dict[str, dict[str, Any]], str | None]: + """Load room registry from TOML file. Returns (registry, error_msg).""" + if not path or not os.path.exists(path): + return {}, None + + try: + from tomlkit import parse # type: ignore + except ImportError: + return {}, "missing dependency tomlkit" + + try: + with open(path, encoding="utf-8") as f: + doc = parse(f.read()) + except Exception as e: + return {}, f"parse error: {e}" + + rooms_section = doc.get("rooms") + if not isinstance(rooms_section, dict): + return {}, None + + registry: dict[str, dict[str, Any]] = {} + now = float(time.time()) + + for room_name, room_data in rooms_section.items(): + if not isinstance(room_data, dict): + continue + + founder = room_data.get("founder") + if isinstance(founder, str): + try: + founder = bytes.fromhex(founder.strip().lower().removeprefix("0x")) + except Exception: + founder = None + + topic = room_data.get("topic") + if not isinstance(topic, str): + topic = None + + moderated = bool(room_data.get("moderated", False)) + invite_only = bool(room_data.get("invite_only", False)) + topic_ops_only = bool(room_data.get("topic_ops_only", False)) + no_outside_msgs = bool(room_data.get("no_outside_msgs", False)) + private = bool(room_data.get("private", False)) + + key = room_data.get("key") + if not isinstance(key, str): + key = None + + operators = room_data.get("operators", []) + ops: set[bytes] = set() + if isinstance(operators, list): + for op in operators: + if isinstance(op, str): + try: + ops.add(bytes.fromhex(op.strip().lower().removeprefix("0x"))) + except Exception: + continue + + voiced_list = room_data.get("voiced", []) + voiced: set[bytes] = set() + if isinstance(voiced_list, list): + for v in voiced_list: + if isinstance(v, str): + try: + voiced.add(bytes.fromhex(v.strip().lower().removeprefix("0x"))) + except Exception: + continue + + bans_list = room_data.get("bans", []) + bans: set[bytes] = set() + if isinstance(bans_list, list): + for b in bans_list: + if isinstance(b, str): + try: + bans.add(bytes.fromhex(b.strip().lower().removeprefix("0x"))) + except Exception: + continue + + invited_dict = room_data.get("invited", {}) + invited: dict[bytes, float] = {} + if isinstance(invited_dict, dict): + for h, exp in invited_dict.items(): + if isinstance(h, str): + try: + h_bytes = bytes.fromhex(h.strip().lower().removeprefix("0x")) + exp_f = float(exp) + if exp_f > now: + invited[h_bytes] = exp_f + except Exception: + continue + + last_used_ts = room_data.get("last_used_ts") + try: + last_used_ts = float(last_used_ts) if last_used_ts is not None else None + except Exception: + last_used_ts = None + + registry[room_name] = { + "founder": founder, + "topic": topic, + "moderated": moderated, + "invite_only": invite_only, + "topic_ops_only": topic_ops_only, + "no_outside_msgs": no_outside_msgs, + "private": private, + "key": key, + "ops": ops, + "voiced": voiced, + "bans": bans, + "invited": invited, + "last_used_ts": last_used_ts, + } + + return registry, None + + def diff_registry_summary( + self, old: dict[str, dict[str, Any]], new: dict[str, dict[str, Any]] + ) -> list[str]: + """Generate human-readable summary of registry changes.""" + old_rooms = set(old.keys()) + new_rooms = set(new.keys()) + added = sorted(new_rooms - old_rooms) + removed = sorted(old_rooms - new_rooms) + + lines: list[str] = [] + if added: + preview = ", ".join(added[:10]) + suffix = "" if len(added) <= 10 else f" (+{len(added) - 10} more)" + lines.append(f"rooms_added={len(added)}: {preview}{suffix}") + if removed: + preview = ", ".join(removed[:10]) + suffix = "" if len(removed) <= 10 else f" (+{len(removed) - 10} more)" + lines.append(f"rooms_removed={len(removed)}: {preview}{suffix}") + if not lines: + lines.append(f"rooms_changed=0 (registered_rooms={len(new_rooms)})") + return lines + + def get_registry_path_for_writes(self) -> str | None: + """Get path to room registry file for write operations.""" + from .util import expand_path + + p = self.hub.config.room_registry_path + if not p: + return None + return expand_path(str(p)) + + def persist_room_state(self, link: RNS.Link, room: str | None) -> None: + """Persist room state to registry TOML file.""" + if room is None: + return + reg_path = self.get_registry_path_for_writes() + if not reg_path: + return + st = self._room_state_get(room) + if not st or not st.get("registered"): + return + + try: + from tomlkit import dumps, parse, table # type: ignore + except Exception: + return + + try: + with self._room_registry_write_lock: + file_stat = None + try: + file_stat = os.stat(reg_path) + except Exception: + file_stat = None + + with open(reg_path, encoding="utf-8") as f: + doc = parse(f.read()) + + rooms = doc.get("rooms") + if rooms is None: + rooms = table() + doc["rooms"] = rooms + + room_tbl = rooms.get(room) + if room_tbl is None: + room_tbl = table() + rooms[room] = room_tbl + + founder = st.get("founder") + if isinstance(founder, (bytes, bytearray)): + room_tbl["founder"] = bytes(founder).hex() + + topic = st.get("topic") + if isinstance(topic, str) and topic.strip(): + room_tbl["topic"] = topic + else: + if "topic" in room_tbl: + del room_tbl["topic"] + + room_tbl["moderated"] = bool(st.get("moderated", False)) + room_tbl["invite_only"] = bool(st.get("invite_only", False)) + room_tbl["topic_ops_only"] = bool(st.get("topic_ops_only", False)) + room_tbl["no_outside_msgs"] = bool(st.get("no_outside_msgs", False)) + + key = st.get("key") + if isinstance(key, str) and key: + room_tbl["key"] = key + else: + if "key" in room_tbl: + del room_tbl["key"] + + last_used_ts = st.get("last_used_ts") + if last_used_ts is None: + last_used_ts = float(time.time()) + try: + room_tbl["last_used_ts"] = float(last_used_ts) + except Exception: + room_tbl["last_used_ts"] = float(time.time()) + + ops = st.get("ops") + if isinstance(ops, set): + room_tbl["operators"] = sorted( + bytes(x).hex() for x in ops if isinstance(x, (bytes, bytearray)) + ) + + voiced = st.get("voiced") + if isinstance(voiced, set): + room_tbl["voiced"] = sorted( + bytes(x).hex() + for x in voiced + if isinstance(x, (bytes, bytearray)) + ) + + bans = st.get("bans") + if isinstance(bans, set): + room_tbl["bans"] = sorted( + bytes(x).hex() + for x in bans + if isinstance(x, (bytes, bytearray)) + ) + + invited = st.get("invited") + if isinstance(invited, dict): + inv_tbl = {} + now = float(time.time()) + for h, exp in invited.items(): + if not isinstance(h, (bytes, bytearray)): + continue + try: + exp_f = float(exp) + except Exception: + continue + if exp_f > now: + inv_tbl[bytes(h).hex()] = exp_f + room_tbl["invited"] = inv_tbl + + new_text = dumps(doc) + with open(reg_path, "w", encoding="utf-8") as f: + f.write(new_text) + + if file_stat is not None: + try: + os.chmod(reg_path, file_stat.st_mode) + except Exception: + pass + except Exception as e: + self.hub._notice_to(link, room, f"room config persist failed: {e}") + + def delete_room_from_registry(self, link: RNS.Link, room: str) -> None: + """Remove a room from the registry TOML file.""" + reg_path = self.get_registry_path_for_writes() + if not reg_path: + return + try: + from tomlkit import dumps, parse # type: ignore + except Exception: + return + + try: + with self._room_registry_write_lock: + file_stat = None + try: + file_stat = os.stat(reg_path) + except Exception: + file_stat = None + + with open(reg_path, encoding="utf-8") as f: + doc = parse(f.read()) + + rooms = doc.get("rooms") + if isinstance(rooms, dict) and room in rooms: + try: + del rooms[room] + except Exception: + rooms.pop(room, None) + + new_text = dumps(doc) + with open(reg_path, "w", encoding="utf-8") as f: + f.write(new_text) + + if file_stat is not None: + try: + os.chmod(reg_path, file_stat.st_mode) + except Exception: + pass + except Exception as e: + self.hub._notice_to(link, room, f"room unregister persist failed: {e}") + + def prune_unused_registered_rooms( + self, prune_after_s: float, started_wall_time: float + ) -> list[str]: + """ + Prune registered rooms that haven't been used recently. + + Returns list of pruned room names. + """ + now = float(time.time()) + rooms_to_prune: list[str] = [] + + for room, reg in list(self._room_registry.items()): + # Skip active rooms + if room in self.rooms and self.rooms.get(room): + continue + + last_used = reg.get("last_used_ts") + try: + last_used = float(last_used) if last_used is not None else None + except Exception: + last_used = None + if last_used is None: + # Never-used rooms are eligible after prune_after from process start + last_used = started_wall_time + + if (now - float(last_used)) < prune_after_s: + continue + + # Prune in-memory + self._room_registry.pop(room, None) + self._room_state.pop(room, None) + rooms_to_prune.append(room) + + return rooms_to_prune + + def merge_registry_into_state(self, registry: dict[str, dict[str, Any]]) -> None: + """ + Merge registry into live room state. + + Updates in-memory state for active rooms with registry data. + """ + for r, st in list(self._room_state.items()): + if not isinstance(st, dict): + continue + + reg = registry.get(r) + if reg is None: + # If a room was unregistered on disk, reflect that + if st.get("registered"): + st["registered"] = False + continue + + st["registered"] = True + + founder = reg.get("founder") + if isinstance(founder, (bytes, bytearray)): + st["founder"] = bytes(founder) + + topic = reg.get("topic") + if isinstance(topic, str): + st["topic"] = topic + + st["moderated"] = bool(reg.get("moderated", False)) + st["invite_only"] = bool(reg.get("invite_only", False)) + st["topic_ops_only"] = bool(reg.get("topic_ops_only", False)) + st["no_outside_msgs"] = bool(reg.get("no_outside_msgs", False)) + st["private"] = bool(reg.get("private", False)) + + key = reg.get("key") + if isinstance(key, str): + st["key"] = key + + ops = reg.get("ops") + if isinstance(ops, set): + st["ops"] = set(ops) + + voiced = reg.get("voiced") + if isinstance(voiced, set): + st["voiced"] = set(voiced) + + bans = reg.get("bans") + if isinstance(bans, set): + st["bans"] = set(bans) + + invited = reg.get("invited") + if isinstance(invited, dict): + st["invited"] = dict(invited) + + last_used_ts = reg.get("last_used_ts") + if last_used_ts is not None: + st["last_used_ts"] = last_used_ts diff --git a/rrcd/router.py b/rrcd/router.py index 7642591..30f1614 100644 --- a/rrcd/router.py +++ b/rrcd/router.py @@ -5,6 +5,11 @@ from typing import TYPE_CHECKING, Any import RNS + +class OutgoingList(list): + """Custom list that allows attaching callback attributes.""" + pass + from .codec import decode, encode from .constants import ( B_HELLO_CAPS, @@ -19,6 +24,7 @@ from .constants import ( K_ROOM, K_SRC, K_T, + RES_KIND_MOTD, T_HELLO, T_JOIN, T_JOINED, @@ -336,6 +342,22 @@ class MessageRouter: peer_hash=peer_hash, motd=self.hub.config.greeting, ) + + # Send MOTD after WELCOME (outside of outgoing queue to enable resource transfer) + # The outgoing queue will be sent first, then this callback will send the MOTD + if self.hub.config.greeting: + def send_motd(): + self.hub._send_text_smart( + link, + msg_type=T_NOTICE, + text=self.hub.config.greeting, + room=None, + kind=RES_KIND_MOTD, + ) + # Store callback to be executed after outgoing packets are sent + if not hasattr(outgoing, '_post_send_callbacks'): + outgoing._post_send_callbacks = [] # type: ignore + outgoing._post_send_callbacks.append(send_motd) # type: ignore def _handle_re_hello( self, @@ -362,12 +384,7 @@ class MessageRouter: # Remove this link from all room membership sets and prune empties. for r in old_rooms: - self.hub.rooms.get(r, set()).discard(link) - if r in self.hub.rooms and not self.hub.rooms[r]: - self.hub.rooms.pop(r, None) - st = self.hub._room_state_get(r) - if st is not None and not st.get("registered"): - self.hub._room_state.pop(r, None) + self.hub.room_manager.remove_member(r, link) new_nick = None @@ -448,15 +465,15 @@ class MessageRouter: return # If room is registered, load its state now. - if r in self.hub._room_registry: - self.hub._room_state_ensure(r) + if r in self.hub.room_manager._room_registry: + self.hub.room_manager._room_state_ensure(r) - st = self.hub._room_state_ensure(r) + st = self.hub.room_manager._room_state_ensure(r) # +i invite-only if bool(st.get("invite_only", False)): - is_invited = self.hub._is_invited(st, peer_hash) - if not self.hub._is_room_op(r, peer_hash) and not is_invited: + is_invited = self.hub.room_manager.is_invited(r, peer_hash) + if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited: if self.hub.identity is not None: self.hub._emit_error( outgoing, @@ -470,8 +487,8 @@ class MessageRouter: # +k key/password (JOIN body must be the key string) key = st.get("key") if isinstance(key, str) and key: - is_invited = self.hub._is_invited(st, peer_hash) - if not self.hub._is_room_op(r, peer_hash) and not is_invited: + is_invited = self.hub.room_manager.is_invited(r, peer_hash) + if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited: provided = body if isinstance(body, str) else None if provided != key: if self.hub.identity is not None: @@ -485,7 +502,7 @@ class MessageRouter: return # Room bans are room-local and apply to JOIN. - if self.hub._is_room_banned(r, peer_hash): + if self.hub.room_manager.is_room_banned(r, peer_hash): if self.hub.identity is not None: self.hub._emit_error( outgoing, @@ -497,12 +514,12 @@ class MessageRouter: return # If the room doesn't exist yet (in-memory), the first joiner is the founder. - if r not in self.hub.rooms: - self.hub.rooms[r] = set() - self.hub._room_state_ensure(r, founder=peer_hash) + if not self.hub.room_manager.get_room_members(r): + pass # room created by add_member + self.hub.room_manager._room_state_ensure(r, founder=peer_hash) sess["rooms"].add(r) - self.hub.rooms.setdefault(r, set()).add(link) + self.hub.room_manager.add_member(r, link) self.log.info( "JOIN peer=%s nick=%r room=%s link_id=%s", @@ -512,12 +529,12 @@ class MessageRouter: self.hub._fmt_link_id(link), ) - self.hub._touch_room(r) + self.hub.room_manager.touch_room(r) joined_body = None if self.hub.config.include_joined_member_list: members: list[bytes] = [] - for member_link in self.hub.rooms.get(r, set()): + for member_link in self.hub.room_manager.get_room_members(r): s = self.hub.session_manager.sessions.get(member_link) ph = s.get("peer") if s else None if isinstance(ph, (bytes, bytearray)): @@ -535,14 +552,14 @@ class MessageRouter: if isinstance(inv, dict) and peer_hash in inv: inv.pop(peer_hash, None) if bool(st.get("registered")): - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.persist_room_state(link, r) except Exception: pass try: registered = bool(st.get("registered", False)) topic = st.get("topic") if isinstance(st.get("topic"), str) else None - mode_txt = self.hub._room_mode_string(r) + mode_txt = self.hub.room_manager.get_room_mode_string(r) topic_txt = topic if topic else "(none)" reg_txt = "registered" if registered else "unregistered" self.hub._emit_notice( @@ -586,23 +603,23 @@ class MessageRouter: return sess["rooms"].discard(r) - if r in self.hub.rooms: - self.hub.rooms[r].discard(link) - if not self.hub.rooms[r]: - self.hub.rooms.pop(r, None) - st = self.hub._room_state_get(r) + if self.hub.room_manager.get_room_members(r): + self.hub.room_manager.remove_member(r, link) + if not self.hub.room_manager.get_room_members(r): + self.hub.room_manager.remove_member(r, link) + st = self.hub.room_manager._room_state_get(r) if st is not None: - self.hub._touch_room(r) + self.hub.room_manager.touch_room(r) if st.get("registered"): - self.hub._persist_room_state_to_registry(link, r) + self.hub.room_manager.persist_room_state(link, r) if st is not None and not st.get("registered"): - self.hub._room_state.pop(r, None) + self.hub.room_manager._room_state.pop(r, None) # Per spec: acknowledge PART with PARTED. parted_body = None if self.hub.config.include_joined_member_list: members: list[bytes] = [] - for member_link in self.hub.rooms.get(r, set()): + for member_link in self.hub.room_manager.get_room_members(r): s = self.hub.session_manager.sessions.get(member_link) ph = s.get("peer") if s else None if isinstance(ph, (bytes, bytearray)): @@ -701,10 +718,10 @@ class MessageRouter: # +n (no outside messages): when enabled, require membership. # When disabled (-n), allow sending to existing/registered rooms. st = None - if r in self.hub._room_registry: - st = self.hub._room_state_ensure(r) - elif r in self.hub.rooms: - st = self.hub._room_state_ensure(r) + if r in self.hub.room_manager._room_registry: + st = self.hub.room_manager._room_state_ensure(r) + elif self.hub.room_manager.get_room_members(r): + st = self.hub.room_manager._room_state_ensure(r) if st is None: if self.hub.identity is not None: @@ -729,7 +746,7 @@ class MessageRouter: return # Per-room moderation: bans and moderated mode. - if self.hub._is_room_banned(r, peer_hash): + if self.hub.room_manager.is_room_banned(r, peer_hash): if self.hub.identity is not None: self.hub._emit_error( outgoing, @@ -739,7 +756,7 @@ class MessageRouter: room=r, ) return - if self.hub._room_moderated(r) and not self.hub._is_room_voiced(r, peer_hash): + if self.hub.room_manager.is_room_moderated(r) and not self.hub.room_manager.is_room_voiced(r, peer_hash): if self.hub.identity is not None: self.hub._emit_error( outgoing, @@ -783,7 +800,7 @@ class MessageRouter: env[K_NICK] = n payload = encode(env) - for other in list(self.hub.rooms.get(r, set())): + for other in list(self.hub.room_manager.get_room_members(r)): self.hub._queue_payload(outgoing, other, payload) if self.log.isEnabledFor(logging.DEBUG): @@ -793,7 +810,7 @@ class MessageRouter: self.hub._fmt_hash(peer_hash), sess.get("nick"), r, - len(self.hub.rooms.get(r, set())), + len(self.hub.room_manager.get_room_members(r)), type(body).__name__, ) diff --git a/rrcd/service.py b/rrcd/service.py index b9863bf..93e39f5 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -27,7 +27,8 @@ from .constants import ( from .envelope import make_envelope from .logging_config import configure_logging from .resources import ResourceManager -from .router import MessageRouter +from .rooms import RoomManager +from .router import MessageRouter, OutgoingList from .session import SessionManager from .util import expand_path @@ -55,22 +56,17 @@ class HubService: # Resource manager for file/data transfers self.resource_manager = ResourceManager(self) + + # Room manager for room memberships and permissions + self.room_manager = RoomManager(self) self.identity: RNS.Identity | None = None self.destination: RNS.Destination | None = None - self.rooms: dict[str, set[RNS.Link]] = {} self._trusted: set[bytes] = set() self._banned: set[bytes] = set() - # Room state (hub-local conventions; no new on-wire message types). - # _room_state holds active in-memory state (and registered state for empty rooms). - # _room_registry holds registered rooms loaded from config. - self._room_state: dict[str, dict[str, Any]] = {} - self._room_registry: dict[str, dict[str, Any]] = {} - - self._room_registry_write_lock = threading.Lock() self._prune_thread: threading.Thread | None = None self._ping_thread: threading.Thread | None = None @@ -221,12 +217,6 @@ class HubService: self._fmt_link_id(link), ) - # The hub MOTD (message of the day) is delivered after WELCOME. - if g: - self._send_text_smart( - link, msg_type=T_NOTICE, text=g, room=None, outgoing=outgoing, kind=RES_KIND_MOTD - ) - def _inc(self, key: str, delta: int = 1) -> None: try: with self._state_lock: @@ -267,21 +257,51 @@ class HubService: # If it fits, send normally if self._packet_would_fit(link, payload): + self.log.debug( + "Text fits in packet link_id=%s bytes=%s", + self._fmt_link_id(link), + len(payload), + ) if outgoing is None: self._send(link, env) else: self._queue_env(outgoing, link, env) return + self.log.debug( + "Text too large for packet link_id=%s bytes=%s mtu_check_failed=True", + self._fmt_link_id(link), + len(payload), + ) + # Too large for packet - try resource if enabled and type is NOTICE - if ( + # Only use resources when NOT batching (outgoing=None), since resource + # creation happens immediately and would race with queued packets. + text_bytes = text.encode(encoding) + can_use_resource = ( self.config.enable_resource_transfer and msg_type == T_NOTICE - and len(text.encode(encoding)) <= self.config.max_resource_bytes - ): - text_bytes = text.encode(encoding) + and outgoing is None + and len(text_bytes) <= self.config.max_resource_bytes + ) + + self.log.debug( + "Resource check: enabled=%s type_is_notice=%s not_batching=%s size_ok=%s/%s", + self.config.enable_resource_transfer, + msg_type == T_NOTICE, + outgoing is None, + len(text_bytes), + self.config.max_resource_bytes, + ) + + if can_use_resource: + self.log.debug( + "Attempting to send via resource link_id=%s kind=%s", + self._fmt_link_id(link), + kind if kind is not None else RES_KIND_NOTICE, + ) resource_kind = kind if kind is not None else RES_KIND_NOTICE - if self._send_via_resource( + if self.resource_manager.send_via_resource( link, kind=resource_kind, payload=text_bytes, @@ -295,9 +315,19 @@ class HubService: len(text), ) return + else: + self.log.warning( + "Resource send failed, falling back to chunks link_id=%s", + self._fmt_link_id(link), + ) # Fall back to chunking for NOTICE if msg_type == T_NOTICE: + self.log.debug( + "Falling back to chunking link_id=%s outgoing_is_none=%s", + self._fmt_link_id(link), + outgoing is None, + ) if outgoing is None: outgoing = [] self._queue_notice_chunks(outgoing, link, room=room, text=text) @@ -444,7 +474,7 @@ class HubService: with self._state_lock: links = self.session_manager.clear_all() - self.rooms.clear() + self.room_manager.clear_all() self.resource_manager.clear_all() for link in links: @@ -562,219 +592,6 @@ class HubService: ) return changed - def _load_room_registry_from_path( - self, - reg_path: str, - *, - invite_timeout_s: float | None = None, - ) -> tuple[dict[str, dict[str, Any]], str | None]: - if not reg_path: - return {}, "room_registry_path is empty" - if not os.path.exists(reg_path): - return {}, f"room registry file not found: {reg_path}" - try: - from tomlkit import parse # type: ignore - except Exception: - return {}, "missing dependency tomlkit" - - try: - with open(reg_path, encoding="utf-8") as f: - doc = parse(f.read()) - except Exception as e: - return {}, f"failed to parse rooms registry: {e}" - - rooms = doc.get("rooms") - if rooms is None: - return {}, None - if not isinstance(rooms, dict): - return {}, "rooms registry: [rooms] must be a table" - - def _parse_list(cfg: dict[str, Any], name: str) -> set[bytes]: - out: set[bytes] = set() - lst = cfg.get(name) - if isinstance(lst, list): - for item in lst: - if not isinstance(item, str) or not item.strip(): - continue - try: - out.add(self._parse_identity_hash(item)) - except Exception: - continue - return out - - registry: dict[str, dict[str, Any]] = {} - for raw_room, raw_cfg in rooms.items(): - if not isinstance(raw_room, str): - continue - try: - room = self._norm_room(raw_room) - except Exception: - continue - if not isinstance(raw_cfg, dict): - continue - - founder_hex = raw_cfg.get("founder") - founder = None - if isinstance(founder_hex, str) and founder_hex.strip(): - try: - founder = self._parse_identity_hash(founder_hex) - except Exception: - founder = None - - topic = raw_cfg.get("topic") - if not isinstance(topic, str) or not topic.strip(): - topic = None - - moderated = bool(raw_cfg.get("moderated", False)) - - invite_only = bool(raw_cfg.get("invite_only", False)) - topic_ops_only = bool(raw_cfg.get("topic_ops_only", False)) - no_outside_msgs = bool(raw_cfg.get("no_outside_msgs", False)) - - key = raw_cfg.get("key") - if not isinstance(key, str) or not key: - key = None - - last_used_ts = raw_cfg.get("last_used_ts") - try: - last_used_ts = float(last_used_ts) if last_used_ts is not None else None - except Exception: - last_used_ts = None - - ops = _parse_list(raw_cfg, "operators") - voiced = _parse_list(raw_cfg, "voiced") - bans = _parse_list(raw_cfg, "bans") - - invited: dict[bytes, float] = {} - raw_inv = raw_cfg.get("invited") - now = float(time.time()) - ttl_src = invite_timeout_s - if ttl_src is None: - ttl_src = self.config.room_invite_timeout_s - ttl = float(ttl_src) if ttl_src else 0.0 - if ttl <= 0: - ttl = 900.0 - - # New format: invited is a table mapping hex->expiry_ts - if isinstance(raw_inv, dict): - for k, v in raw_inv.items(): - if not isinstance(k, str) or not k.strip(): - continue - try: - h = self._parse_identity_hash(k) - except Exception: - continue - try: - exp = float(v) - except Exception: - continue - if exp > now: - invited[h] = exp - - # Back-compat: invited as a list of identity hashes => grant ttl from now - elif isinstance(raw_inv, list): - for item in raw_inv: - if not isinstance(item, str) or not item.strip(): - continue - try: - h = self._parse_identity_hash(item) - except Exception: - continue - invited[h] = now + ttl - - if founder is not None: - ops.add(founder) - - registry[room] = { - "founder": founder, - "registered": True, - "topic": topic, - "moderated": moderated, - "invite_only": invite_only, - "topic_ops_only": topic_ops_only, - "no_outside_msgs": no_outside_msgs, - "key": key, - "ops": ops, - "voiced": voiced, - "bans": bans, - "invited": invited, - "last_used_ts": last_used_ts, - } - - return registry, None - - def _diff_room_registry_summary( - self, old: dict[str, dict[str, Any]], new: dict[str, dict[str, Any]] - ) -> list[str]: - old_rooms = set(old.keys()) - new_rooms = set(new.keys()) - added = sorted(new_rooms - old_rooms) - removed = sorted(old_rooms - new_rooms) - - lines: list[str] = [] - if added: - preview = ", ".join(added[:10]) - suffix = "" if len(added) <= 10 else f" (+{len(added) - 10} more)" - lines.append(f"rooms_added={len(added)}: {preview}{suffix}") - if removed: - preview = ", ".join(removed[:10]) - suffix = "" if len(removed) <= 10 else f" (+{len(removed) - 10} more)" - lines.append(f"rooms_removed={len(removed)}: {preview}{suffix}") - if not lines: - lines.append(f"rooms_changed=0 (registered_rooms={len(new_rooms)})") - return lines - - def _room_modes(self, room: str) -> dict[str, Any]: - st = self._room_state_ensure(room) - registered = bool(st.get("registered", False)) - moderated = bool(st.get("moderated", False)) - invite_only = bool(st.get("invite_only", False)) - topic_ops_only = bool(st.get("topic_ops_only", False)) - no_outside_msgs = bool(st.get("no_outside_msgs", False)) - private = bool(st.get("private", False)) - key = st.get("key") - has_key = isinstance(key, str) and bool(key) - return { - "registered": registered, - "moderated": moderated, - "invite_only": invite_only, - "topic_ops_only": topic_ops_only, - "no_outside_msgs": no_outside_msgs, - "private": private, - "has_key": has_key, - } - - def _room_mode_string(self, room: str) -> str: - m = self._room_modes(room) - flags: list[str] = [] - # Keep roughly IRC-ish order. - if m.get("invite_only"): - flags.append("i") - if m.get("has_key"): - flags.append("k") - if m.get("moderated"): - flags.append("m") - if m.get("no_outside_msgs"): - flags.append("n") - if m.get("private"): - flags.append("p") - if m.get("registered"): - flags.append("r") - if m.get("topic_ops_only"): - flags.append("t") - return "+" + "".join(flags) if flags else "(none)" - - def _broadcast_room_mode( - self, room: str, outgoing: list[tuple[RNS.Link, bytes]] | None = None - ) -> None: - mode_txt = self._room_mode_string(room) - with self._state_lock: - recipients = list(self.rooms.get(room, set())) - for other in recipients: - self._emit_notice( - outgoing, other, room, f"mode for {room} is now: {mode_txt}" - ) - def _ensure_worker_threads(self) -> None: # Announce loop if self._announce_thread is None or not self._announce_thread.is_alive(): @@ -829,7 +646,7 @@ class HubService: old_cfg = self.config old_trusted = set(self._trusted) old_banned = set(self._banned) - old_registry = dict(self._room_registry) + old_registry = dict(self.room_manager._room_registry) # Stage config parse try: @@ -865,7 +682,7 @@ class HubService: if new_cfg.room_registry_path else "" ) - new_registry, reg_err = self._load_room_registry_from_path( + new_registry, reg_err = self.room_manager.load_registry_from_path( reg_path, invite_timeout_s=new_cfg.room_invite_timeout_s, ) @@ -878,57 +695,11 @@ class HubService: self.config = new_cfg self._trusted = new_trusted self._banned = new_banned - self._room_registry = new_registry + self.room_manager._room_registry = new_registry # Merge registry into live per-room state (for active rooms). # This makes /reload take effect immediately for existing members. - for r, st in list(self._room_state.items()): - if not isinstance(st, dict): - continue - - reg = self._room_registry.get(r) - if reg is None: - # If a room was unregistered on disk, reflect that. - if st.get("registered"): - st["registered"] = False - continue - - st["registered"] = True - - founder = reg.get("founder") - if isinstance(founder, (bytes, bytearray)): - st["founder"] = bytes(founder) - - # Simple scalar fields - for key in ( - "topic", - "moderated", - "invite_only", - "topic_ops_only", - "no_outside_msgs", - "key", - "last_used_ts", - ): - if key in reg: - st[key] = reg.get(key) - - # Set fields - for key in ("ops", "voiced", "bans"): - v = reg.get(key) - if isinstance(v, set): - st[key] = set(v) - - # Invites (dict[bytes, float]) - inv = reg.get("invited") - if isinstance(inv, dict): - st["invited"] = dict(inv) - - # Ensure founder stays op. - founder_st = st.get("founder") - if isinstance(founder_st, (bytes, bytearray)): - ops = st.setdefault("ops", set()) - if isinstance(ops, set): - ops.add(bytes(founder_st)) + self.room_manager.merge_registry_into_state(new_registry) self._ensure_worker_threads() @@ -939,7 +710,7 @@ class HubService: self.log.exception("Failed to reconfigure logging") cfg_changes = self._diff_config_summary(old_cfg, new_cfg) - room_changes = self._diff_room_registry_summary(old_registry, new_registry) + room_changes = self.room_manager.diff_registry_summary(old_registry, new_registry) lines: list[str] = [] lines.append( @@ -963,160 +734,20 @@ class HubService: self._emit_notice(outgoing, link, room, "\n".join(lines)) - def _room_registry_path_for_writes(self) -> str | None: - p = self.config.room_registry_path - if not p: - return - return expand_path(str(p)) - def _load_registered_rooms_from_registry(self) -> None: - reg_path = self._room_registry_path_for_writes() + reg_path = self.room_manager.get_registry_path_for_writes() if not reg_path: return - registry, err = self._load_room_registry_from_path(reg_path) + registry, err = self.room_manager.load_registry_from_path( + reg_path, invite_timeout_s=self.config.room_invite_timeout_s + ) if err is not None: return - self._room_registry = registry - - def _room_state_get(self, room: str) -> dict[str, Any] | None: - return self._room_state.get(room) - - def _room_state_ensure( - self, room: str, *, founder: bytes | None = None - ) -> dict[str, Any]: - st = self._room_state.get(room) - if st is not None: - if st.get("founder") is None and founder is not None: - st["founder"] = founder - st.setdefault("ops", set()).add(founder) - return st - - if room in self._room_registry: - base = self._room_registry[room] - invited = base.get("invited") - invited_dict: dict[bytes, float] = {} - if isinstance(invited, dict): - for k, v in invited.items(): - if isinstance(k, (bytes, bytearray)): - try: - invited_dict[bytes(k)] = float(v) - except Exception: - continue - st = { - "founder": base.get("founder"), - "registered": True, - "topic": base.get("topic"), - "moderated": bool(base.get("moderated", False)), - "invite_only": bool(base.get("invite_only", False)), - "topic_ops_only": bool(base.get("topic_ops_only", False)), - "no_outside_msgs": bool(base.get("no_outside_msgs", False)), - "private": bool(base.get("private", False)), - "key": base.get("key"), - "ops": set(base.get("ops", set())), - "voiced": set(base.get("voiced", set())), - "bans": set(base.get("bans", set())), - "invited": invited_dict, - "last_used_ts": base.get("last_used_ts"), - } - self._room_state[room] = st - return st - - st = { - "founder": founder, - "registered": False, - "topic": None, - "moderated": False, - "invite_only": False, - "topic_ops_only": False, - "no_outside_msgs": False, - "private": False, - "key": None, - "ops": set([founder]) if founder is not None else set(), - "voiced": set(), - "bans": set(), - "invited": {}, - "last_used_ts": None, - } - self._room_state[room] = st - return st - - def _prune_expired_invites(self, st: dict[str, Any]) -> bool: - inv = st.get("invited") - if not isinstance(inv, dict) or not inv: - return False - now = float(time.time()) - removed_any = False - for h, exp in list(inv.items()): - try: - exp_f = float(exp) - except Exception: - exp_f = 0.0 - if exp_f <= now: - inv.pop(h, None) - removed_any = True - return removed_any - - def _is_invited(self, st: dict[str, Any], peer_hash: bytes) -> bool: - inv = st.get("invited") - if not isinstance(inv, dict) or not inv: - return False - now = float(time.time()) - exp = inv.get(peer_hash) - try: - exp_f = float(exp) if exp is not None else 0.0 - except Exception: - exp_f = 0.0 - if exp_f <= now: - inv.pop(peer_hash, None) - return False - return True - - def _touch_room(self, room: str) -> None: - try: - st = self._room_state_ensure(room) - ts = float(time.time()) - st["last_used_ts"] = ts - reg = self._room_registry.get(room) - if isinstance(reg, dict): - reg["last_used_ts"] = ts - except Exception: - pass + self.room_manager._room_registry = registry def _is_server_op(self, peer_hash: bytes | None) -> bool: return self._is_trusted(peer_hash) - def _is_room_op(self, room: str, peer_hash: bytes | None) -> bool: - if peer_hash is None: - return False - if self._is_server_op(peer_hash): - return True - st = self._room_state_ensure(room) - founder = st.get("founder") - if isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash: - return True - ops = st.get("ops") - return isinstance(ops, set) and peer_hash in ops - - def _is_room_voiced(self, room: str, peer_hash: bytes | None) -> bool: - if peer_hash is None: - return False - if self._is_room_op(room, peer_hash): - return True - st = self._room_state_ensure(room) - voiced = st.get("voiced") - return isinstance(voiced, set) and peer_hash in voiced - - def _is_room_banned(self, room: str, peer_hash: bytes | None) -> bool: - if peer_hash is None: - return False - st = self._room_state_ensure(room) - bans = st.get("bans") - return isinstance(bans, set) and peer_hash in bans - - def _room_moderated(self, room: str) -> bool: - st = self._room_state_ensure(room) - return bool(st.get("moderated", False)) - def _resolve_identity_hash( self, token: str, *, room: str | None = None ) -> bytes | None: @@ -1160,211 +791,6 @@ class HubService: except Exception: return (None, []) - def _persist_room_state_to_registry(self, link: RNS.Link, room: str | None) -> None: - if room is None: - return - reg_path = self._room_registry_path_for_writes() - if not reg_path: - return - st = self._room_state_get(room) - if not st or not st.get("registered"): - return - - try: - from tomlkit import dumps, parse, table # type: ignore - except Exception: - return - - try: - with self._room_registry_write_lock: - file_stat = None - try: - file_stat = os.stat(reg_path) - except Exception: - file_stat = None - - with open(reg_path, encoding="utf-8") as f: - doc = parse(f.read()) - - rooms = doc.get("rooms") - if rooms is None: - rooms = table() - doc["rooms"] = rooms - - room_tbl = rooms.get(room) - if room_tbl is None: - room_tbl = table() - rooms[room] = room_tbl - - founder = st.get("founder") - if isinstance(founder, (bytes, bytearray)): - room_tbl["founder"] = bytes(founder).hex() - - topic = st.get("topic") - if isinstance(topic, str) and topic.strip(): - room_tbl["topic"] = topic - else: - if "topic" in room_tbl: - del room_tbl["topic"] - - room_tbl["moderated"] = bool(st.get("moderated", False)) - - room_tbl["invite_only"] = bool(st.get("invite_only", False)) - room_tbl["topic_ops_only"] = bool(st.get("topic_ops_only", False)) - room_tbl["no_outside_msgs"] = bool(st.get("no_outside_msgs", False)) - - key = st.get("key") - if isinstance(key, str) and key: - room_tbl["key"] = key - else: - if "key" in room_tbl: - del room_tbl["key"] - - last_used_ts = st.get("last_used_ts") - if last_used_ts is None: - last_used_ts = float(time.time()) - try: - room_tbl["last_used_ts"] = float(last_used_ts) - except Exception: - room_tbl["last_used_ts"] = float(time.time()) - - ops = st.get("ops") - if isinstance(ops, set): - room_tbl["operators"] = sorted( - bytes(x).hex() for x in ops if isinstance(x, (bytes, bytearray)) - ) - - voiced = st.get("voiced") - if isinstance(voiced, set): - room_tbl["voiced"] = sorted( - bytes(x).hex() - for x in voiced - if isinstance(x, (bytes, bytearray)) - ) - - bans = st.get("bans") - if isinstance(bans, set): - room_tbl["bans"] = sorted( - bytes(x).hex() - for x in bans - if isinstance(x, (bytes, bytearray)) - ) - - invited = st.get("invited") - if isinstance(invited, dict): - inv_tbl = {} - now = float(time.time()) - for h, exp in invited.items(): - if not isinstance(h, (bytes, bytearray)): - continue - try: - exp_f = float(exp) - except Exception: - continue - if exp_f > now: - inv_tbl[bytes(h).hex()] = exp_f - room_tbl["invited"] = inv_tbl - - new_text = dumps(doc) - with open(reg_path, "w", encoding="utf-8") as f: - f.write(new_text) - - if file_stat is not None: - try: - os.chmod(reg_path, file_stat.st_mode) - except Exception: - pass - except Exception as e: - self._notice_to(link, room, f"room config persist failed: {e}") - - def _delete_room_from_registry(self, link: RNS.Link, room: str) -> None: - reg_path = self._room_registry_path_for_writes() - if not reg_path: - return - try: - from tomlkit import dumps, parse # type: ignore - except Exception: - return - - try: - with self._room_registry_write_lock: - file_stat = None - try: - file_stat = os.stat(reg_path) - except Exception: - file_stat = None - - with open(reg_path, encoding="utf-8") as f: - doc = parse(f.read()) - - rooms = doc.get("rooms") - if isinstance(rooms, dict) and room in rooms: - try: - del rooms[room] - except Exception: - rooms.pop(room, None) - - new_text = dumps(doc) - with open(reg_path, "w", encoding="utf-8") as f: - f.write(new_text) - - if file_stat is not None: - try: - os.chmod(reg_path, file_stat.st_mode) - except Exception: - pass - except Exception as e: - self._notice_to(link, room, f"room unregister persist failed: {e}") - - def _prune_loop(self) -> None: - while not self._shutdown.is_set(): - interval = float(self.config.room_registry_prune_interval_s) - prune_after = float(self.config.room_registry_prune_after_s) - if interval <= 0 or prune_after <= 0: - time.sleep(1.0) - continue - - time.sleep(interval) - if self._shutdown.is_set(): - break - - now = float(time.time()) - - rooms_to_prune: list[str] = [] - dummy_link: RNS.Link | None = None - - with self._state_lock: - dummy_link = next(iter(self.session_manager.sessions.keys()), None) - - for room, reg in list(self._room_registry.items()): - # Skip active rooms. - if room in self.rooms and self.rooms.get(room): - continue - - last_used = reg.get("last_used_ts") - try: - last_used = float(last_used) if last_used is not None else None - except Exception: - last_used = None - if last_used is None: - # Never-used rooms are eligible after prune_after from process start. - last_used = self._started_wall_time or now - - if (now - float(last_used)) < prune_after: - continue - - # Prune in-memory under lock. - self._room_registry.pop(room, None) - self._room_state.pop(room, None) - rooms_to_prune.append(room) - - if dummy_link is not None: - for room in rooms_to_prune: - self._delete_room_from_registry(dummy_link, room) - - for room in rooms_to_prune: - self.log.info("Pruned unused registered room %s", room) - def _resource_cleanup_loop(self) -> None: """Periodically cleanup expired resource expectations.""" while not self._shutdown.is_set(): @@ -1377,6 +803,35 @@ class HubService: except Exception: self.log.exception("Resource cleanup failed") + def _prune_loop(self) -> None: + """Periodically prune unused registered rooms.""" + while not self._shutdown.is_set(): + interval = float(self.config.room_registry_prune_interval_s) + prune_after = float(self.config.room_registry_prune_after_s) + if interval <= 0 or prune_after <= 0: + time.sleep(1.0) + continue + + time.sleep(interval) + if self._shutdown.is_set(): + break + + rooms_to_prune: list[str] = [] + dummy_link: RNS.Link | None = None + + with self._state_lock: + dummy_link = next(iter(self.session_manager.sessions.keys()), None) + rooms_to_prune = self.room_manager.prune_unused_registered_rooms( + prune_after, self._started_wall_time or time.time() + ) + + if dummy_link is not None: + for room in rooms_to_prune: + self.room_manager.delete_room_from_registry(dummy_link, room) + + for room in rooms_to_prune: + self.log.info("Pruned unused registered room %s", room) + def _config_path_for_writes(self) -> str | None: p = self.config.config_path if not p: @@ -1519,13 +974,10 @@ class HubService: sessions_welcomed = session_stats["welcomed"] sessions_identified = session_stats["identified"] - rooms_total = len(self.rooms) - memberships = sum(len(v) for v in self.rooms.values()) - - top_rooms = sorted( - ((room, len(links)) for room, links in self.rooms.items()), - key=lambda x: (-x[1], x[0]), - )[:5] + room_stats = self.room_manager.get_stats() + rooms_total = room_stats["rooms_total"] + memberships = room_stats["memberships"] + top_rooms = room_stats["top_rooms"] trusted_count = len(self._trusted) banned_count = len(self._banned) @@ -1652,6 +1104,8 @@ class HubService: peer_hash=sess.get("peer"), motd=self.config.greeting, ) + + # Send queued WELCOME first for out_link, payload in outgoing: self._inc("bytes_out", len(payload)) try: @@ -1670,6 +1124,21 @@ class HubService: len(payload), exc_info=True, ) + + # Now send MOTD via resource or chunks (after WELCOME is sent) + if self.config.greeting: + self.log.debug( + "Sending MOTD link_id=%s len=%s", + self._fmt_link_id(link), + len(self.config.greeting), + ) + self._send_text_smart( + link, + msg_type=T_NOTICE, + text=self.config.greeting, + room=None, + kind=RES_KIND_MOTD, + ) def _on_close(self, link: RNS.Link) -> None: peer = None @@ -1731,7 +1200,7 @@ class HubService: # Packet callbacks can occur concurrently with other link callbacks and # background worker threads. Keep state mutations under the shared lock, # but avoid holding the lock while sending packets via RNS. - outgoing: list[tuple[RNS.Link, bytes]] = [] + outgoing: list[tuple[RNS.Link, bytes]] = OutgoingList() with self._state_lock: self._on_packet_locked(link, data, outgoing) @@ -1760,6 +1229,14 @@ class HubService: len(payload), exc_info=True, ) + + # Execute any post-send callbacks (e.g., for MOTD after WELCOME) + if hasattr(outgoing, '_post_send_callbacks'): + for callback in outgoing._post_send_callbacks: # type: ignore + try: + callback() + except Exception: + self.log.exception("Post-send callback failed") def _on_packet_locked( self, diff --git a/rrcd/session.py b/rrcd/session.py index 4ed4aad..7ad5813 100644 --- a/rrcd/session.py +++ b/rrcd/session.py @@ -130,12 +130,7 @@ class SessionManager: # Clean up room memberships for room in list(sess["rooms"]): - self.hub.rooms.get(room, set()).discard(link) - if room in self.hub.rooms and not self.hub.rooms[room]: - self.hub.rooms.pop(room, None) - st = self.hub._room_state_get(room) - if st is not None and not st.get("registered"): - self.hub._room_state.pop(room, None) + self.hub.room_manager.remove_member(room, link) return peer, nick, rooms_count