refactor room management

This commit is contained in:
kc1awv
2026-01-07 10:57:35 -05:00
parent 36e06ebfe8
commit 7e5675fdd7
7 changed files with 971 additions and 774 deletions
+1 -1
View File
@@ -49,7 +49,7 @@ version = {attr = "rrcd.__version__"}
[tool.ruff]
target-version = "py311"
line-length = 88
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "B", "UP"]
+83 -83
View File
@@ -79,14 +79,14 @@ class CommandHandler:
# List all registered, non-private rooms with their topics
with self.hub._state_lock:
registered_rooms = []
for room_name, st in self.hub._room_state.items():
for room_name, st in self.hub.room_manager._room_state.items():
if st.get("registered") and not st.get("private"):
topic = st.get("topic")
registered_rooms.append((room_name, topic))
# Also check room registry for rooms not currently in room_state
for room_name, reg in self.hub._room_registry.items():
if room_name not in self.hub._room_state:
for room_name, reg in self.hub.room_manager._room_registry.items():
if room_name not in self.hub.room_manager._room_state:
if not reg.get("private"):
topic = reg.get("topic")
registered_rooms.append((room_name, topic))
@@ -123,14 +123,14 @@ class CommandHandler:
return True
# Check if room is private - only server operators can see private rooms
st = self.hub._room_state_get(r)
st = self.hub.room_manager._room_state_get(r)
if st and st.get("private"):
if not self.hub._is_server_op(peer_hash):
self._emit_notice(outgoing, link, None, f"room {r} is private")
return True
members = []
for other in sorted(self.hub.rooms.get(r, set()), key=lambda x: id(x)):
for other in sorted(self.hub.room_manager.get_room_members(r), key=lambda x: id(x)):
s = self.hub.session_manager.sessions.get(other)
if not s:
continue
@@ -164,7 +164,7 @@ class CommandHandler:
self._emit_notice(outgoing, link, room, f"bad room: {e}")
return True
if not self.hub._is_room_op(r, peer_hash):
if not self.hub.room_manager.is_room_op(r, peer_hash):
if self.hub.identity is not None:
self._emit_error(
outgoing,
@@ -190,10 +190,10 @@ class CommandHandler:
return True
tsess["rooms"].discard(r)
if r in self.hub.rooms:
self.hub.rooms[r].discard(target_link)
if not self.hub.rooms[r]:
self.hub.rooms.pop(r, None)
if self.hub.room_manager.get_room_members(r):
self.hub.room_manager.rooms[r].discard(target_link)
if not self.hub.room_manager.rooms[r]:
pass # room cleanup handled by room_manager
if self.hub.identity is not None:
self._emit_error(
@@ -327,11 +327,11 @@ class CommandHandler:
)
return True
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
# Clean up expired invites (best-effort).
if self.hub._prune_expired_invites(st) and bool(st.get("registered")):
self.hub._persist_room_state_to_registry(link, r)
if self.hub.room_manager.prune_expired_invites(r) and bool(st.get("registered")):
self.hub.room_manager.persist_room_state(link, r)
founder = st.get("founder")
if not (
isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash
@@ -346,7 +346,7 @@ class CommandHandler:
)
return True
if not self.hub._room_registry_path_for_writes():
if not self.hub.room_manager.get_registry_path_for_writes():
self._emit_notice(
outgoing, link, room, "cannot register room: no room_registry_path"
)
@@ -357,10 +357,10 @@ class CommandHandler:
st["topic_ops_only"] = True
if isinstance(founder, (bytes, bytearray)):
st.setdefault("ops", set()).add(bytes(founder))
self.hub._touch_room(r)
self.hub.room_manager.touch_room(r)
# Ensure registry mirrors registered rooms.
self.hub._room_registry[r] = {
self.hub.room_manager._room_registry[r] = {
"founder": bytes(founder)
if isinstance(founder, (bytes, bytearray))
else None,
@@ -379,7 +379,7 @@ class CommandHandler:
"last_used_ts": st.get("last_used_ts"),
}
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"registered room {r}")
return True
@@ -403,7 +403,7 @@ class CommandHandler:
)
return True
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
founder = st.get("founder")
if not (
isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash
@@ -423,11 +423,11 @@ class CommandHandler:
return True
st["registered"] = False
self.hub._room_registry.pop(r, None)
self.hub.room_manager._room_registry.pop(r, None)
self.hub._delete_room_from_registry(link, r)
# Drop state if empty.
if r not in self.hub.rooms or not self.hub.rooms.get(r):
self.hub._room_state.pop(r, None)
if not self.hub.room_manager.get_room_members(r) or not self.hub.room_manager.get_room_members(r):
self.hub.room_manager._room_state.pop(r, None)
self._emit_notice(outgoing, link, room, f"unregistered room {r}")
return True
@@ -440,7 +440,7 @@ class CommandHandler:
except Exception as e:
self._emit_notice(outgoing, link, None, f"bad room: {e}")
return True
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
if len(parts) == 2:
topic = st.get("topic")
self._emit_notice(
@@ -451,8 +451,8 @@ class CommandHandler:
)
return True
if not self.hub._is_room_op(r, peer_hash):
st = self.hub._room_state_ensure(r)
if not self.hub.room_manager.is_room_op(r, peer_hash):
st = self.hub.room_manager._room_state_ensure(r)
if bool(st.get("topic_ops_only", False)):
if self.hub.identity is not None:
self._emit_error(
@@ -466,10 +466,10 @@ class CommandHandler:
topic = " ".join(parts[2:]).strip()
st["topic"] = topic if topic else None
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
# Broadcast topic change to current members.
for other in list(self.hub.rooms.get(r, set())):
for other in list(self.hub.room_manager.get_room_members(r)):
self._emit_notice(
outgoing,
other,
@@ -489,7 +489,7 @@ class CommandHandler:
except Exception as e:
self._emit_notice(outgoing, link, None, f"bad room: {e}")
return True
if not self.hub._is_room_op(r, peer_hash):
if not self.hub.room_manager.is_room_op(r, peer_hash):
if self.hub.identity is not None:
self._emit_error(
outgoing,
@@ -507,7 +507,7 @@ class CommandHandler:
)
return True
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
founder = st.get("founder")
founder_b = (
bytes(founder) if isinstance(founder, (bytes, bytearray)) else None
@@ -520,8 +520,8 @@ class CommandHandler:
st["ops"] = ops
if cmd == "op":
ops.add(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"op granted in {r}")
return True
else:
@@ -529,8 +529,8 @@ class CommandHandler:
self._emit_notice(outgoing, link, room, "cannot deop founder")
return True
ops.discard(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"op removed in {r}")
return True
@@ -540,14 +540,14 @@ class CommandHandler:
st["voiced"] = voiced
if cmd == "voice":
voiced.add(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"voice granted in {r}")
return True
else:
voiced.discard(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"voice removed in {r}")
return True
@@ -565,7 +565,7 @@ class CommandHandler:
except Exception as e:
self._emit_notice(outgoing, link, None, f"bad room: {e}")
return True
if not self.hub._is_room_op(r, peer_hash):
if not self.hub.room_manager.is_room_op(r, peer_hash):
if self.hub.identity is not None:
self._emit_error(
outgoing,
@@ -576,41 +576,41 @@ class CommandHandler:
)
return True
flag = parts[2].strip().lower()
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
if flag in ("+m", "-m"):
st["moderated"] = flag == "+m"
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub._broadcast_room_mode(r, outgoing)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self.hub.room_manager.broadcast_room_mode(r, outgoing)
return True
if flag in ("+i", "-i"):
st["invite_only"] = flag == "+i"
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub._broadcast_room_mode(r, outgoing)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self.hub.room_manager.broadcast_room_mode(r, outgoing)
return True
if flag in ("+t", "-t"):
st["topic_ops_only"] = flag == "+t"
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub._broadcast_room_mode(r, outgoing)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self.hub.room_manager.broadcast_room_mode(r, outgoing)
return True
if flag in ("+n", "-n"):
st["no_outside_msgs"] = flag == "+n"
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub._broadcast_room_mode(r, outgoing)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self.hub.room_manager.broadcast_room_mode(r, outgoing)
return True
if flag in ("+p", "-p"):
st["private"] = flag == "+p"
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub._broadcast_room_mode(r, outgoing)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self.hub.room_manager.broadcast_room_mode(r, outgoing)
return True
if flag in ("+k", "-k"):
@@ -627,9 +627,9 @@ class CommandHandler:
st["key"] = key
else:
st["key"] = None
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub._broadcast_room_mode(r, outgoing)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self.hub.room_manager.broadcast_room_mode(r, outgoing)
return True
if flag in ("+r", "-r"):
@@ -676,9 +676,9 @@ class CommandHandler:
return True
ops.discard(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
for other in list(self.hub.rooms.get(r, set())):
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
for other in list(self.hub.room_manager.get_room_members(r)):
self._emit_notice(
outgoing,
other,
@@ -696,9 +696,9 @@ class CommandHandler:
else:
voiced.discard(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
for other in list(self.hub.rooms.get(r, set())):
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
for other in list(self.hub.room_manager.get_room_members(r)):
self._emit_notice(
outgoing,
other,
@@ -733,7 +733,7 @@ class CommandHandler:
op = parts[2].strip().lower()
if op == "list":
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
bans = st.get("bans")
if not isinstance(bans, set) or not bans:
self._emit_notice(outgoing, link, room, f"no bans in {r}")
@@ -761,7 +761,7 @@ class CommandHandler:
)
return True
if not self.hub._is_room_op(r, peer_hash):
if not self.hub.room_manager.is_room_op(r, peer_hash):
if self.hub.identity is not None:
self._emit_error(
outgoing,
@@ -779,7 +779,7 @@ class CommandHandler:
)
return True
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
bans = st.setdefault("bans", set())
if not isinstance(bans, set):
bans = set()
@@ -787,16 +787,16 @@ class CommandHandler:
if op == "add":
bans.add(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
# If currently present in room, remove them.
for other in list(self.hub.rooms.get(r, set())):
for other in list(self.hub.room_manager.get_room_members(r)):
s = self.hub.session_manager.sessions.get(other)
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)) and bytes(ph) == target_hash:
s.get("rooms", set()).discard(r)
self.hub.rooms.get(r, set()).discard(other)
self.hub.room_manager.get_room_members(r).discard(other)
if self.hub.identity is not None:
self._emit_error(
outgoing,
@@ -805,14 +805,14 @@ class CommandHandler:
text=f"banned from {r}",
room=r,
)
if r in self.hub.rooms and not self.hub.rooms[r]:
self.hub.rooms.pop(r, None)
if self.hub.room_manager.get_room_members(r) and not self.hub.room_manager.rooms[r]:
pass # room cleanup handled by room_manager
self._emit_notice(outgoing, link, room, f"ban added in {r}")
return True
bans.discard(target_hash)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"ban removed in {r}")
return True
@@ -832,7 +832,7 @@ class CommandHandler:
self._emit_notice(outgoing, link, None, f"bad room: {e}")
return True
if not self.hub._is_room_op(r, peer_hash):
if not self.hub.room_manager.is_room_op(r, peer_hash):
if self.hub.identity is not None:
self._emit_error(
outgoing,
@@ -844,7 +844,7 @@ class CommandHandler:
return True
op = parts[2].strip().lower()
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
invited = st.setdefault("invited", {})
if not isinstance(invited, dict):
@@ -852,7 +852,7 @@ class CommandHandler:
st["invited"] = invited
# Drop expired entries before operating.
pruned = self.hub._prune_expired_invites(st)
pruned = self.hub.room_manager.prune_expired_invites(r)
if op == "list":
now = float(time.time())
@@ -869,8 +869,8 @@ class CommandHandler:
items.append(f"{bytes(h).hex()} expires_in={int(exp_f - now)}s")
items.sort()
if pruned:
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(
outgoing,
link,
@@ -955,8 +955,8 @@ class CommandHandler:
ttl = 900.0
exp = float(time.time()) + ttl
invited[target_hash] = exp
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(
outgoing,
link,
@@ -978,8 +978,8 @@ class CommandHandler:
if target_hash in invited:
invited.pop(target_hash, None)
self.hub._touch_room(r)
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.touch_room(r)
self.hub.room_manager.persist_room_state(link, r)
self._emit_notice(outgoing, link, room, f"invite removed in {r}")
return True
+8 -2
View File
@@ -416,7 +416,7 @@ class ResourceManager:
with self.hub._state_lock:
sess = self.hub.session_manager.sessions.get(link)
peer_hash = sess.get("peer") if sess else None
room_members = self.hub.rooms.get(exp.room, set())
room_members = self.hub.room_manager.get_room_members(exp.room)
if peer_hash and room_members:
notice_env = make_envelope(
@@ -494,6 +494,10 @@ class ResourceManager:
"""
Send large payload via Resource.
Returns True if successfully initiated, False otherwise.
Note: This sends the resource envelope immediately, then creates
and advertises the resource. Should only be called when immediate
sending is desired (not when batching messages).
"""
if not self.hub.config.enable_resource_transfer:
return False
@@ -555,7 +559,9 @@ class ResourceManager:
# Create and advertise resource
try:
resource = RNS.Resource(payload, link, advertise=True, auto_compress=False)
resource = RNS.Resource(
payload, link, advertise=True, auto_compress=False
)
with self.hub._state_lock:
self._active_resources.setdefault(link, set()).add(resource)
+702
View File
@@ -0,0 +1,702 @@
"""Room management for RRCD hub.
This module handles all room-related functionality including:
- Room membership tracking
- Room state (modes, topic, permissions)
- Room registry persistence to TOML
- Permission management (ops, voiced, bans)
- Invite tracking with expiration
"""
from __future__ import annotations
import logging
import os
import threading
import time
from typing import TYPE_CHECKING, Any
import RNS
if TYPE_CHECKING:
from .service import HubService
class RoomManager:
"""Manages room memberships, state, permissions, and registry persistence."""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = logging.getLogger("rrcd.rooms")
# Room memberships: room name -> set of links in that room
self.rooms: dict[str, set[RNS.Link]] = {}
# Room state (hub-local conventions; no new on-wire message types).
# _room_state holds active in-memory state (and registered state for empty rooms).
# _room_registry holds registered rooms loaded from config.
self._room_state: dict[str, dict[str, Any]] = {}
self._room_registry: dict[str, dict[str, Any]] = {}
self._room_registry_write_lock = threading.Lock()
def clear_all(self) -> None:
"""Clear all room state. Called during hub shutdown."""
self.rooms.clear()
self._room_state.clear()
self._room_registry.clear()
def get_room_members(self, room: str) -> set[RNS.Link]:
"""Get set of links currently in a room."""
return self.rooms.get(room, set())
def add_member(self, room: str, link: RNS.Link, *, founder: bytes | None = None) -> None:
"""Add a link to a room, creating the room if needed."""
if room not in self.rooms:
self.rooms[room] = set()
self._room_state_ensure(room, founder=founder)
self.rooms.setdefault(room, set()).add(link)
def remove_member(self, room: str, link: RNS.Link) -> None:
"""Remove a link from a room, cleaning up empty rooms."""
if room in self.rooms:
self.rooms[room].discard(link)
if not self.rooms[room]:
self.rooms.pop(room, None)
st = self._room_state_get(room)
# Clean up unregistered empty rooms
if st is not None and not st.get("registered"):
self._room_state.pop(room, None)
def remove_member_from_all(self, link: RNS.Link) -> int:
"""Remove a link from all rooms. Returns number of rooms left."""
rooms_to_remove = [r for r, links in self.rooms.items() if link in links]
for room in rooms_to_remove:
self.remove_member(room, link)
return len(rooms_to_remove)
def get_member_rooms(self, link: RNS.Link) -> list[str]:
"""Get list of rooms a link is currently in."""
return [room for room, links in self.rooms.items() if link in links]
def get_stats(self) -> dict[str, Any]:
"""Get room statistics for hub stats."""
rooms_total = len(self.rooms)
memberships = sum(len(v) for v in self.rooms.values())
top_rooms = sorted(
((room, len(links)) for room, links in self.rooms.items()),
key=lambda x: (-x[1], x[0]),
)[:5]
return {
"rooms_total": rooms_total,
"memberships": memberships,
"top_rooms": top_rooms,
}
# Room state management
def _room_state_get(self, room: str) -> dict[str, Any] | None:
"""Get room state dict if it exists."""
return self._room_state.get(room)
def _room_state_ensure(
self, room: str, *, founder: bytes | None = None
) -> dict[str, Any]:
"""Ensure room state exists, creating from registry or defaults."""
st = self._room_state.get(room)
if st is not None:
if st.get("founder") is None and founder is not None:
st["founder"] = founder
st.setdefault("ops", set()).add(founder)
return st
# Load from registry if registered
if room in self._room_registry:
base = self._room_registry[room]
invited = base.get("invited")
invited_dict: dict[bytes, float] = {}
if isinstance(invited, dict):
for k, v in invited.items():
if isinstance(k, (bytes, bytearray)):
try:
invited_dict[bytes(k)] = float(v)
except Exception:
continue
st = {
"founder": base.get("founder"),
"registered": True,
"topic": base.get("topic"),
"moderated": bool(base.get("moderated", False)),
"invite_only": bool(base.get("invite_only", False)),
"topic_ops_only": bool(base.get("topic_ops_only", False)),
"no_outside_msgs": bool(base.get("no_outside_msgs", False)),
"private": bool(base.get("private", False)),
"key": base.get("key"),
"ops": set(base.get("ops", set())),
"voiced": set(base.get("voiced", set())),
"bans": set(base.get("bans", set())),
"invited": invited_dict,
"last_used_ts": base.get("last_used_ts"),
}
self._room_state[room] = st
return st
# Create new unregistered room
st = {
"founder": founder,
"registered": False,
"topic": None,
"moderated": False,
"invite_only": False,
"topic_ops_only": False,
"no_outside_msgs": False,
"private": False,
"key": None,
"ops": set([founder]) if founder is not None else set(),
"voiced": set(),
"bans": set(),
"invited": {},
"last_used_ts": None,
}
self._room_state[room] = st
return st
def touch_room(self, room: str) -> None:
"""Update last_used_ts for a room."""
try:
st = self._room_state_ensure(room)
ts = float(time.time())
st["last_used_ts"] = ts
reg = self._room_registry.get(room)
if isinstance(reg, dict):
reg["last_used_ts"] = ts
except Exception:
pass
# Room modes and permissions
def get_room_modes(self, room: str) -> dict[str, Any]:
"""Get dict of room mode flags."""
st = self._room_state_ensure(room)
registered = bool(st.get("registered", False))
moderated = bool(st.get("moderated", False))
invite_only = bool(st.get("invite_only", False))
topic_ops_only = bool(st.get("topic_ops_only", False))
no_outside_msgs = bool(st.get("no_outside_msgs", False))
private = bool(st.get("private", False))
key = st.get("key")
has_key = isinstance(key, str) and bool(key)
return {
"registered": registered,
"moderated": moderated,
"invite_only": invite_only,
"topic_ops_only": topic_ops_only,
"no_outside_msgs": no_outside_msgs,
"private": private,
"has_key": has_key,
}
def get_room_mode_string(self, room: str) -> str:
"""Get IRC-style mode string for a room."""
m = self.get_room_modes(room)
flags: list[str] = []
# Keep roughly IRC-ish order.
if m.get("invite_only"):
flags.append("i")
if m.get("has_key"):
flags.append("k")
if m.get("moderated"):
flags.append("m")
if m.get("no_outside_msgs"):
flags.append("n")
if m.get("private"):
flags.append("p")
if m.get("registered"):
flags.append("r")
if m.get("topic_ops_only"):
flags.append("t")
return "+" + "".join(flags) if flags else "(none)"
def broadcast_room_mode(
self, room: str, outgoing: list[tuple[RNS.Link, bytes]] | None = None
) -> None:
"""Broadcast current room mode to all members."""
mode_txt = self.get_room_mode_string(room)
recipients = list(self.get_room_members(room))
for other in recipients:
self.hub._emit_notice(
outgoing, other, room, f"mode for {room} is now: {mode_txt}"
)
def is_room_moderated(self, room: str) -> bool:
"""Check if room is moderated."""
st = self._room_state_ensure(room)
return bool(st.get("moderated", False))
def is_room_op(self, room: str, peer_hash: bytes | None) -> bool:
"""Check if peer is a room operator."""
if peer_hash is None:
return False
if self.hub._is_server_op(peer_hash):
return True
st = self._room_state_ensure(room)
founder = st.get("founder")
if isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash:
return True
ops = st.get("ops")
return isinstance(ops, set) and peer_hash in ops
def is_room_voiced(self, room: str, peer_hash: bytes | None) -> bool:
"""Check if peer has voice in room."""
if peer_hash is None:
return False
if self.is_room_op(room, peer_hash):
return True
st = self._room_state_ensure(room)
voiced = st.get("voiced")
return isinstance(voiced, set) and peer_hash in voiced
def is_room_banned(self, room: str, peer_hash: bytes | None) -> bool:
"""Check if peer is banned from room."""
if peer_hash is None:
return False
st = self._room_state_ensure(room)
bans = st.get("bans")
return isinstance(bans, set) and peer_hash in bans
# Invite management
def is_invited(self, room: str, peer_hash: bytes) -> bool:
"""Check if peer has a valid (non-expired) invite."""
st = self._room_state_ensure(room)
inv = st.get("invited")
if not isinstance(inv, dict) or not inv:
return False
now = float(time.time())
exp = inv.get(peer_hash)
try:
exp_f = float(exp) if exp is not None else 0.0
except Exception:
exp_f = 0.0
if exp_f <= now:
inv.pop(peer_hash, None)
return False
return True
def prune_expired_invites(self, room: str) -> bool:
"""Remove expired invites from a room. Returns True if any were removed."""
st = self._room_state_ensure(room)
inv = st.get("invited")
if not isinstance(inv, dict) or not inv:
return False
now = float(time.time())
removed_any = False
for h, exp in list(inv.items()):
try:
exp_f = float(exp)
except Exception:
exp_f = 0.0
if exp_f <= now:
inv.pop(h, None)
removed_any = True
return removed_any
# Room registry persistence
def load_registry_from_path(
self, path: str, *, invite_timeout_s: float
) -> tuple[dict[str, dict[str, Any]], str | None]:
"""Load room registry from TOML file. Returns (registry, error_msg)."""
if not path or not os.path.exists(path):
return {}, None
try:
from tomlkit import parse # type: ignore
except ImportError:
return {}, "missing dependency tomlkit"
try:
with open(path, encoding="utf-8") as f:
doc = parse(f.read())
except Exception as e:
return {}, f"parse error: {e}"
rooms_section = doc.get("rooms")
if not isinstance(rooms_section, dict):
return {}, None
registry: dict[str, dict[str, Any]] = {}
now = float(time.time())
for room_name, room_data in rooms_section.items():
if not isinstance(room_data, dict):
continue
founder = room_data.get("founder")
if isinstance(founder, str):
try:
founder = bytes.fromhex(founder.strip().lower().removeprefix("0x"))
except Exception:
founder = None
topic = room_data.get("topic")
if not isinstance(topic, str):
topic = None
moderated = bool(room_data.get("moderated", False))
invite_only = bool(room_data.get("invite_only", False))
topic_ops_only = bool(room_data.get("topic_ops_only", False))
no_outside_msgs = bool(room_data.get("no_outside_msgs", False))
private = bool(room_data.get("private", False))
key = room_data.get("key")
if not isinstance(key, str):
key = None
operators = room_data.get("operators", [])
ops: set[bytes] = set()
if isinstance(operators, list):
for op in operators:
if isinstance(op, str):
try:
ops.add(bytes.fromhex(op.strip().lower().removeprefix("0x")))
except Exception:
continue
voiced_list = room_data.get("voiced", [])
voiced: set[bytes] = set()
if isinstance(voiced_list, list):
for v in voiced_list:
if isinstance(v, str):
try:
voiced.add(bytes.fromhex(v.strip().lower().removeprefix("0x")))
except Exception:
continue
bans_list = room_data.get("bans", [])
bans: set[bytes] = set()
if isinstance(bans_list, list):
for b in bans_list:
if isinstance(b, str):
try:
bans.add(bytes.fromhex(b.strip().lower().removeprefix("0x")))
except Exception:
continue
invited_dict = room_data.get("invited", {})
invited: dict[bytes, float] = {}
if isinstance(invited_dict, dict):
for h, exp in invited_dict.items():
if isinstance(h, str):
try:
h_bytes = bytes.fromhex(h.strip().lower().removeprefix("0x"))
exp_f = float(exp)
if exp_f > now:
invited[h_bytes] = exp_f
except Exception:
continue
last_used_ts = room_data.get("last_used_ts")
try:
last_used_ts = float(last_used_ts) if last_used_ts is not None else None
except Exception:
last_used_ts = None
registry[room_name] = {
"founder": founder,
"topic": topic,
"moderated": moderated,
"invite_only": invite_only,
"topic_ops_only": topic_ops_only,
"no_outside_msgs": no_outside_msgs,
"private": private,
"key": key,
"ops": ops,
"voiced": voiced,
"bans": bans,
"invited": invited,
"last_used_ts": last_used_ts,
}
return registry, None
def diff_registry_summary(
self, old: dict[str, dict[str, Any]], new: dict[str, dict[str, Any]]
) -> list[str]:
"""Generate human-readable summary of registry changes."""
old_rooms = set(old.keys())
new_rooms = set(new.keys())
added = sorted(new_rooms - old_rooms)
removed = sorted(old_rooms - new_rooms)
lines: list[str] = []
if added:
preview = ", ".join(added[:10])
suffix = "" if len(added) <= 10 else f" (+{len(added) - 10} more)"
lines.append(f"rooms_added={len(added)}: {preview}{suffix}")
if removed:
preview = ", ".join(removed[:10])
suffix = "" if len(removed) <= 10 else f" (+{len(removed) - 10} more)"
lines.append(f"rooms_removed={len(removed)}: {preview}{suffix}")
if not lines:
lines.append(f"rooms_changed=0 (registered_rooms={len(new_rooms)})")
return lines
def get_registry_path_for_writes(self) -> str | None:
"""Get path to room registry file for write operations."""
from .util import expand_path
p = self.hub.config.room_registry_path
if not p:
return None
return expand_path(str(p))
def persist_room_state(self, link: RNS.Link, room: str | None) -> None:
"""Persist room state to registry TOML file."""
if room is None:
return
reg_path = self.get_registry_path_for_writes()
if not reg_path:
return
st = self._room_state_get(room)
if not st or not st.get("registered"):
return
try:
from tomlkit import dumps, parse, table # type: ignore
except Exception:
return
try:
with self._room_registry_write_lock:
file_stat = None
try:
file_stat = os.stat(reg_path)
except Exception:
file_stat = None
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
rooms = doc.get("rooms")
if rooms is None:
rooms = table()
doc["rooms"] = rooms
room_tbl = rooms.get(room)
if room_tbl is None:
room_tbl = table()
rooms[room] = room_tbl
founder = st.get("founder")
if isinstance(founder, (bytes, bytearray)):
room_tbl["founder"] = bytes(founder).hex()
topic = st.get("topic")
if isinstance(topic, str) and topic.strip():
room_tbl["topic"] = topic
else:
if "topic" in room_tbl:
del room_tbl["topic"]
room_tbl["moderated"] = bool(st.get("moderated", False))
room_tbl["invite_only"] = bool(st.get("invite_only", False))
room_tbl["topic_ops_only"] = bool(st.get("topic_ops_only", False))
room_tbl["no_outside_msgs"] = bool(st.get("no_outside_msgs", False))
key = st.get("key")
if isinstance(key, str) and key:
room_tbl["key"] = key
else:
if "key" in room_tbl:
del room_tbl["key"]
last_used_ts = st.get("last_used_ts")
if last_used_ts is None:
last_used_ts = float(time.time())
try:
room_tbl["last_used_ts"] = float(last_used_ts)
except Exception:
room_tbl["last_used_ts"] = float(time.time())
ops = st.get("ops")
if isinstance(ops, set):
room_tbl["operators"] = sorted(
bytes(x).hex() for x in ops if isinstance(x, (bytes, bytearray))
)
voiced = st.get("voiced")
if isinstance(voiced, set):
room_tbl["voiced"] = sorted(
bytes(x).hex()
for x in voiced
if isinstance(x, (bytes, bytearray))
)
bans = st.get("bans")
if isinstance(bans, set):
room_tbl["bans"] = sorted(
bytes(x).hex()
for x in bans
if isinstance(x, (bytes, bytearray))
)
invited = st.get("invited")
if isinstance(invited, dict):
inv_tbl = {}
now = float(time.time())
for h, exp in invited.items():
if not isinstance(h, (bytes, bytearray)):
continue
try:
exp_f = float(exp)
except Exception:
continue
if exp_f > now:
inv_tbl[bytes(h).hex()] = exp_f
room_tbl["invited"] = inv_tbl
new_text = dumps(doc)
with open(reg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if file_stat is not None:
try:
os.chmod(reg_path, file_stat.st_mode)
except Exception:
pass
except Exception as e:
self.hub._notice_to(link, room, f"room config persist failed: {e}")
def delete_room_from_registry(self, link: RNS.Link, room: str) -> None:
"""Remove a room from the registry TOML file."""
reg_path = self.get_registry_path_for_writes()
if not reg_path:
return
try:
from tomlkit import dumps, parse # type: ignore
except Exception:
return
try:
with self._room_registry_write_lock:
file_stat = None
try:
file_stat = os.stat(reg_path)
except Exception:
file_stat = None
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
rooms = doc.get("rooms")
if isinstance(rooms, dict) and room in rooms:
try:
del rooms[room]
except Exception:
rooms.pop(room, None)
new_text = dumps(doc)
with open(reg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if file_stat is not None:
try:
os.chmod(reg_path, file_stat.st_mode)
except Exception:
pass
except Exception as e:
self.hub._notice_to(link, room, f"room unregister persist failed: {e}")
def prune_unused_registered_rooms(
self, prune_after_s: float, started_wall_time: float
) -> list[str]:
"""
Prune registered rooms that haven't been used recently.
Returns list of pruned room names.
"""
now = float(time.time())
rooms_to_prune: list[str] = []
for room, reg in list(self._room_registry.items()):
# Skip active rooms
if room in self.rooms and self.rooms.get(room):
continue
last_used = reg.get("last_used_ts")
try:
last_used = float(last_used) if last_used is not None else None
except Exception:
last_used = None
if last_used is None:
# Never-used rooms are eligible after prune_after from process start
last_used = started_wall_time
if (now - float(last_used)) < prune_after_s:
continue
# Prune in-memory
self._room_registry.pop(room, None)
self._room_state.pop(room, None)
rooms_to_prune.append(room)
return rooms_to_prune
def merge_registry_into_state(self, registry: dict[str, dict[str, Any]]) -> None:
"""
Merge registry into live room state.
Updates in-memory state for active rooms with registry data.
"""
for r, st in list(self._room_state.items()):
if not isinstance(st, dict):
continue
reg = registry.get(r)
if reg is None:
# If a room was unregistered on disk, reflect that
if st.get("registered"):
st["registered"] = False
continue
st["registered"] = True
founder = reg.get("founder")
if isinstance(founder, (bytes, bytearray)):
st["founder"] = bytes(founder)
topic = reg.get("topic")
if isinstance(topic, str):
st["topic"] = topic
st["moderated"] = bool(reg.get("moderated", False))
st["invite_only"] = bool(reg.get("invite_only", False))
st["topic_ops_only"] = bool(reg.get("topic_ops_only", False))
st["no_outside_msgs"] = bool(reg.get("no_outside_msgs", False))
st["private"] = bool(reg.get("private", False))
key = reg.get("key")
if isinstance(key, str):
st["key"] = key
ops = reg.get("ops")
if isinstance(ops, set):
st["ops"] = set(ops)
voiced = reg.get("voiced")
if isinstance(voiced, set):
st["voiced"] = set(voiced)
bans = reg.get("bans")
if isinstance(bans, set):
st["bans"] = set(bans)
invited = reg.get("invited")
if isinstance(invited, dict):
st["invited"] = dict(invited)
last_used_ts = reg.get("last_used_ts")
if last_used_ts is not None:
st["last_used_ts"] = last_used_ts
+56 -39
View File
@@ -5,6 +5,11 @@ from typing import TYPE_CHECKING, Any
import RNS
class OutgoingList(list):
"""Custom list that allows attaching callback attributes."""
pass
from .codec import decode, encode
from .constants import (
B_HELLO_CAPS,
@@ -19,6 +24,7 @@ from .constants import (
K_ROOM,
K_SRC,
K_T,
RES_KIND_MOTD,
T_HELLO,
T_JOIN,
T_JOINED,
@@ -336,6 +342,22 @@ class MessageRouter:
peer_hash=peer_hash,
motd=self.hub.config.greeting,
)
# Send MOTD after WELCOME (outside of outgoing queue to enable resource transfer)
# The outgoing queue will be sent first, then this callback will send the MOTD
if self.hub.config.greeting:
def send_motd():
self.hub._send_text_smart(
link,
msg_type=T_NOTICE,
text=self.hub.config.greeting,
room=None,
kind=RES_KIND_MOTD,
)
# Store callback to be executed after outgoing packets are sent
if not hasattr(outgoing, '_post_send_callbacks'):
outgoing._post_send_callbacks = [] # type: ignore
outgoing._post_send_callbacks.append(send_motd) # type: ignore
def _handle_re_hello(
self,
@@ -362,12 +384,7 @@ class MessageRouter:
# Remove this link from all room membership sets and prune empties.
for r in old_rooms:
self.hub.rooms.get(r, set()).discard(link)
if r in self.hub.rooms and not self.hub.rooms[r]:
self.hub.rooms.pop(r, None)
st = self.hub._room_state_get(r)
if st is not None and not st.get("registered"):
self.hub._room_state.pop(r, None)
self.hub.room_manager.remove_member(r, link)
new_nick = None
@@ -448,15 +465,15 @@ class MessageRouter:
return
# If room is registered, load its state now.
if r in self.hub._room_registry:
self.hub._room_state_ensure(r)
if r in self.hub.room_manager._room_registry:
self.hub.room_manager._room_state_ensure(r)
st = self.hub._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
# +i invite-only
if bool(st.get("invite_only", False)):
is_invited = self.hub._is_invited(st, peer_hash)
if not self.hub._is_room_op(r, peer_hash) and not is_invited:
is_invited = self.hub.room_manager.is_invited(r, peer_hash)
if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited:
if self.hub.identity is not None:
self.hub._emit_error(
outgoing,
@@ -470,8 +487,8 @@ class MessageRouter:
# +k key/password (JOIN body must be the key string)
key = st.get("key")
if isinstance(key, str) and key:
is_invited = self.hub._is_invited(st, peer_hash)
if not self.hub._is_room_op(r, peer_hash) and not is_invited:
is_invited = self.hub.room_manager.is_invited(r, peer_hash)
if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited:
provided = body if isinstance(body, str) else None
if provided != key:
if self.hub.identity is not None:
@@ -485,7 +502,7 @@ class MessageRouter:
return
# Room bans are room-local and apply to JOIN.
if self.hub._is_room_banned(r, peer_hash):
if self.hub.room_manager.is_room_banned(r, peer_hash):
if self.hub.identity is not None:
self.hub._emit_error(
outgoing,
@@ -497,12 +514,12 @@ class MessageRouter:
return
# If the room doesn't exist yet (in-memory), the first joiner is the founder.
if r not in self.hub.rooms:
self.hub.rooms[r] = set()
self.hub._room_state_ensure(r, founder=peer_hash)
if not self.hub.room_manager.get_room_members(r):
pass # room created by add_member
self.hub.room_manager._room_state_ensure(r, founder=peer_hash)
sess["rooms"].add(r)
self.hub.rooms.setdefault(r, set()).add(link)
self.hub.room_manager.add_member(r, link)
self.log.info(
"JOIN peer=%s nick=%r room=%s link_id=%s",
@@ -512,12 +529,12 @@ class MessageRouter:
self.hub._fmt_link_id(link),
)
self.hub._touch_room(r)
self.hub.room_manager.touch_room(r)
joined_body = None
if self.hub.config.include_joined_member_list:
members: list[bytes] = []
for member_link in self.hub.rooms.get(r, set()):
for member_link in self.hub.room_manager.get_room_members(r):
s = self.hub.session_manager.sessions.get(member_link)
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
@@ -535,14 +552,14 @@ class MessageRouter:
if isinstance(inv, dict) and peer_hash in inv:
inv.pop(peer_hash, None)
if bool(st.get("registered")):
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.persist_room_state(link, r)
except Exception:
pass
try:
registered = bool(st.get("registered", False))
topic = st.get("topic") if isinstance(st.get("topic"), str) else None
mode_txt = self.hub._room_mode_string(r)
mode_txt = self.hub.room_manager.get_room_mode_string(r)
topic_txt = topic if topic else "(none)"
reg_txt = "registered" if registered else "unregistered"
self.hub._emit_notice(
@@ -586,23 +603,23 @@ class MessageRouter:
return
sess["rooms"].discard(r)
if r in self.hub.rooms:
self.hub.rooms[r].discard(link)
if not self.hub.rooms[r]:
self.hub.rooms.pop(r, None)
st = self.hub._room_state_get(r)
if self.hub.room_manager.get_room_members(r):
self.hub.room_manager.remove_member(r, link)
if not self.hub.room_manager.get_room_members(r):
self.hub.room_manager.remove_member(r, link)
st = self.hub.room_manager._room_state_get(r)
if st is not None:
self.hub._touch_room(r)
self.hub.room_manager.touch_room(r)
if st.get("registered"):
self.hub._persist_room_state_to_registry(link, r)
self.hub.room_manager.persist_room_state(link, r)
if st is not None and not st.get("registered"):
self.hub._room_state.pop(r, None)
self.hub.room_manager._room_state.pop(r, None)
# Per spec: acknowledge PART with PARTED.
parted_body = None
if self.hub.config.include_joined_member_list:
members: list[bytes] = []
for member_link in self.hub.rooms.get(r, set()):
for member_link in self.hub.room_manager.get_room_members(r):
s = self.hub.session_manager.sessions.get(member_link)
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
@@ -701,10 +718,10 @@ class MessageRouter:
# +n (no outside messages): when enabled, require membership.
# When disabled (-n), allow sending to existing/registered rooms.
st = None
if r in self.hub._room_registry:
st = self.hub._room_state_ensure(r)
elif r in self.hub.rooms:
st = self.hub._room_state_ensure(r)
if r in self.hub.room_manager._room_registry:
st = self.hub.room_manager._room_state_ensure(r)
elif self.hub.room_manager.get_room_members(r):
st = self.hub.room_manager._room_state_ensure(r)
if st is None:
if self.hub.identity is not None:
@@ -729,7 +746,7 @@ class MessageRouter:
return
# Per-room moderation: bans and moderated mode.
if self.hub._is_room_banned(r, peer_hash):
if self.hub.room_manager.is_room_banned(r, peer_hash):
if self.hub.identity is not None:
self.hub._emit_error(
outgoing,
@@ -739,7 +756,7 @@ class MessageRouter:
room=r,
)
return
if self.hub._room_moderated(r) and not self.hub._is_room_voiced(r, peer_hash):
if self.hub.room_manager.is_room_moderated(r) and not self.hub.room_manager.is_room_voiced(r, peer_hash):
if self.hub.identity is not None:
self.hub._emit_error(
outgoing,
@@ -783,7 +800,7 @@ class MessageRouter:
env[K_NICK] = n
payload = encode(env)
for other in list(self.hub.rooms.get(r, set())):
for other in list(self.hub.room_manager.get_room_members(r)):
self.hub._queue_payload(outgoing, other, payload)
if self.log.isEnabledFor(logging.DEBUG):
@@ -793,7 +810,7 @@ class MessageRouter:
self.hub._fmt_hash(peer_hash),
sess.get("nick"),
r,
len(self.hub.rooms.get(r, set())),
len(self.hub.room_manager.get_room_members(r)),
type(body).__name__,
)
+120 -643
View File
@@ -27,7 +27,8 @@ from .constants import (
from .envelope import make_envelope
from .logging_config import configure_logging
from .resources import ResourceManager
from .router import MessageRouter
from .rooms import RoomManager
from .router import MessageRouter, OutgoingList
from .session import SessionManager
from .util import expand_path
@@ -55,22 +56,17 @@ class HubService:
# Resource manager for file/data transfers
self.resource_manager = ResourceManager(self)
# Room manager for room memberships and permissions
self.room_manager = RoomManager(self)
self.identity: RNS.Identity | None = None
self.destination: RNS.Destination | None = None
self.rooms: dict[str, set[RNS.Link]] = {}
self._trusted: set[bytes] = set()
self._banned: set[bytes] = set()
# Room state (hub-local conventions; no new on-wire message types).
# _room_state holds active in-memory state (and registered state for empty rooms).
# _room_registry holds registered rooms loaded from config.
self._room_state: dict[str, dict[str, Any]] = {}
self._room_registry: dict[str, dict[str, Any]] = {}
self._room_registry_write_lock = threading.Lock()
self._prune_thread: threading.Thread | None = None
self._ping_thread: threading.Thread | None = None
@@ -221,12 +217,6 @@ class HubService:
self._fmt_link_id(link),
)
# The hub MOTD (message of the day) is delivered after WELCOME.
if g:
self._send_text_smart(
link, msg_type=T_NOTICE, text=g, room=None, outgoing=outgoing, kind=RES_KIND_MOTD
)
def _inc(self, key: str, delta: int = 1) -> None:
try:
with self._state_lock:
@@ -267,21 +257,51 @@ class HubService:
# If it fits, send normally
if self._packet_would_fit(link, payload):
self.log.debug(
"Text fits in packet link_id=%s bytes=%s",
self._fmt_link_id(link),
len(payload),
)
if outgoing is None:
self._send(link, env)
else:
self._queue_env(outgoing, link, env)
return
self.log.debug(
"Text too large for packet link_id=%s bytes=%s mtu_check_failed=True",
self._fmt_link_id(link),
len(payload),
)
# Too large for packet - try resource if enabled and type is NOTICE
if (
# Only use resources when NOT batching (outgoing=None), since resource
# creation happens immediately and would race with queued packets.
text_bytes = text.encode(encoding)
can_use_resource = (
self.config.enable_resource_transfer
and msg_type == T_NOTICE
and len(text.encode(encoding)) <= self.config.max_resource_bytes
):
text_bytes = text.encode(encoding)
and outgoing is None
and len(text_bytes) <= self.config.max_resource_bytes
)
self.log.debug(
"Resource check: enabled=%s type_is_notice=%s not_batching=%s size_ok=%s/%s",
self.config.enable_resource_transfer,
msg_type == T_NOTICE,
outgoing is None,
len(text_bytes),
self.config.max_resource_bytes,
)
if can_use_resource:
self.log.debug(
"Attempting to send via resource link_id=%s kind=%s",
self._fmt_link_id(link),
kind if kind is not None else RES_KIND_NOTICE,
)
resource_kind = kind if kind is not None else RES_KIND_NOTICE
if self._send_via_resource(
if self.resource_manager.send_via_resource(
link,
kind=resource_kind,
payload=text_bytes,
@@ -295,9 +315,19 @@ class HubService:
len(text),
)
return
else:
self.log.warning(
"Resource send failed, falling back to chunks link_id=%s",
self._fmt_link_id(link),
)
# Fall back to chunking for NOTICE
if msg_type == T_NOTICE:
self.log.debug(
"Falling back to chunking link_id=%s outgoing_is_none=%s",
self._fmt_link_id(link),
outgoing is None,
)
if outgoing is None:
outgoing = []
self._queue_notice_chunks(outgoing, link, room=room, text=text)
@@ -444,7 +474,7 @@ class HubService:
with self._state_lock:
links = self.session_manager.clear_all()
self.rooms.clear()
self.room_manager.clear_all()
self.resource_manager.clear_all()
for link in links:
@@ -562,219 +592,6 @@ class HubService:
)
return changed
def _load_room_registry_from_path(
self,
reg_path: str,
*,
invite_timeout_s: float | None = None,
) -> tuple[dict[str, dict[str, Any]], str | None]:
if not reg_path:
return {}, "room_registry_path is empty"
if not os.path.exists(reg_path):
return {}, f"room registry file not found: {reg_path}"
try:
from tomlkit import parse # type: ignore
except Exception:
return {}, "missing dependency tomlkit"
try:
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
except Exception as e:
return {}, f"failed to parse rooms registry: {e}"
rooms = doc.get("rooms")
if rooms is None:
return {}, None
if not isinstance(rooms, dict):
return {}, "rooms registry: [rooms] must be a table"
def _parse_list(cfg: dict[str, Any], name: str) -> set[bytes]:
out: set[bytes] = set()
lst = cfg.get(name)
if isinstance(lst, list):
for item in lst:
if not isinstance(item, str) or not item.strip():
continue
try:
out.add(self._parse_identity_hash(item))
except Exception:
continue
return out
registry: dict[str, dict[str, Any]] = {}
for raw_room, raw_cfg in rooms.items():
if not isinstance(raw_room, str):
continue
try:
room = self._norm_room(raw_room)
except Exception:
continue
if not isinstance(raw_cfg, dict):
continue
founder_hex = raw_cfg.get("founder")
founder = None
if isinstance(founder_hex, str) and founder_hex.strip():
try:
founder = self._parse_identity_hash(founder_hex)
except Exception:
founder = None
topic = raw_cfg.get("topic")
if not isinstance(topic, str) or not topic.strip():
topic = None
moderated = bool(raw_cfg.get("moderated", False))
invite_only = bool(raw_cfg.get("invite_only", False))
topic_ops_only = bool(raw_cfg.get("topic_ops_only", False))
no_outside_msgs = bool(raw_cfg.get("no_outside_msgs", False))
key = raw_cfg.get("key")
if not isinstance(key, str) or not key:
key = None
last_used_ts = raw_cfg.get("last_used_ts")
try:
last_used_ts = float(last_used_ts) if last_used_ts is not None else None
except Exception:
last_used_ts = None
ops = _parse_list(raw_cfg, "operators")
voiced = _parse_list(raw_cfg, "voiced")
bans = _parse_list(raw_cfg, "bans")
invited: dict[bytes, float] = {}
raw_inv = raw_cfg.get("invited")
now = float(time.time())
ttl_src = invite_timeout_s
if ttl_src is None:
ttl_src = self.config.room_invite_timeout_s
ttl = float(ttl_src) if ttl_src else 0.0
if ttl <= 0:
ttl = 900.0
# New format: invited is a table mapping hex->expiry_ts
if isinstance(raw_inv, dict):
for k, v in raw_inv.items():
if not isinstance(k, str) or not k.strip():
continue
try:
h = self._parse_identity_hash(k)
except Exception:
continue
try:
exp = float(v)
except Exception:
continue
if exp > now:
invited[h] = exp
# Back-compat: invited as a list of identity hashes => grant ttl from now
elif isinstance(raw_inv, list):
for item in raw_inv:
if not isinstance(item, str) or not item.strip():
continue
try:
h = self._parse_identity_hash(item)
except Exception:
continue
invited[h] = now + ttl
if founder is not None:
ops.add(founder)
registry[room] = {
"founder": founder,
"registered": True,
"topic": topic,
"moderated": moderated,
"invite_only": invite_only,
"topic_ops_only": topic_ops_only,
"no_outside_msgs": no_outside_msgs,
"key": key,
"ops": ops,
"voiced": voiced,
"bans": bans,
"invited": invited,
"last_used_ts": last_used_ts,
}
return registry, None
def _diff_room_registry_summary(
self, old: dict[str, dict[str, Any]], new: dict[str, dict[str, Any]]
) -> list[str]:
old_rooms = set(old.keys())
new_rooms = set(new.keys())
added = sorted(new_rooms - old_rooms)
removed = sorted(old_rooms - new_rooms)
lines: list[str] = []
if added:
preview = ", ".join(added[:10])
suffix = "" if len(added) <= 10 else f" (+{len(added) - 10} more)"
lines.append(f"rooms_added={len(added)}: {preview}{suffix}")
if removed:
preview = ", ".join(removed[:10])
suffix = "" if len(removed) <= 10 else f" (+{len(removed) - 10} more)"
lines.append(f"rooms_removed={len(removed)}: {preview}{suffix}")
if not lines:
lines.append(f"rooms_changed=0 (registered_rooms={len(new_rooms)})")
return lines
def _room_modes(self, room: str) -> dict[str, Any]:
st = self._room_state_ensure(room)
registered = bool(st.get("registered", False))
moderated = bool(st.get("moderated", False))
invite_only = bool(st.get("invite_only", False))
topic_ops_only = bool(st.get("topic_ops_only", False))
no_outside_msgs = bool(st.get("no_outside_msgs", False))
private = bool(st.get("private", False))
key = st.get("key")
has_key = isinstance(key, str) and bool(key)
return {
"registered": registered,
"moderated": moderated,
"invite_only": invite_only,
"topic_ops_only": topic_ops_only,
"no_outside_msgs": no_outside_msgs,
"private": private,
"has_key": has_key,
}
def _room_mode_string(self, room: str) -> str:
m = self._room_modes(room)
flags: list[str] = []
# Keep roughly IRC-ish order.
if m.get("invite_only"):
flags.append("i")
if m.get("has_key"):
flags.append("k")
if m.get("moderated"):
flags.append("m")
if m.get("no_outside_msgs"):
flags.append("n")
if m.get("private"):
flags.append("p")
if m.get("registered"):
flags.append("r")
if m.get("topic_ops_only"):
flags.append("t")
return "+" + "".join(flags) if flags else "(none)"
def _broadcast_room_mode(
self, room: str, outgoing: list[tuple[RNS.Link, bytes]] | None = None
) -> None:
mode_txt = self._room_mode_string(room)
with self._state_lock:
recipients = list(self.rooms.get(room, set()))
for other in recipients:
self._emit_notice(
outgoing, other, room, f"mode for {room} is now: {mode_txt}"
)
def _ensure_worker_threads(self) -> None:
# Announce loop
if self._announce_thread is None or not self._announce_thread.is_alive():
@@ -829,7 +646,7 @@ class HubService:
old_cfg = self.config
old_trusted = set(self._trusted)
old_banned = set(self._banned)
old_registry = dict(self._room_registry)
old_registry = dict(self.room_manager._room_registry)
# Stage config parse
try:
@@ -865,7 +682,7 @@ class HubService:
if new_cfg.room_registry_path
else ""
)
new_registry, reg_err = self._load_room_registry_from_path(
new_registry, reg_err = self.room_manager.load_registry_from_path(
reg_path,
invite_timeout_s=new_cfg.room_invite_timeout_s,
)
@@ -878,57 +695,11 @@ class HubService:
self.config = new_cfg
self._trusted = new_trusted
self._banned = new_banned
self._room_registry = new_registry
self.room_manager._room_registry = new_registry
# Merge registry into live per-room state (for active rooms).
# This makes /reload take effect immediately for existing members.
for r, st in list(self._room_state.items()):
if not isinstance(st, dict):
continue
reg = self._room_registry.get(r)
if reg is None:
# If a room was unregistered on disk, reflect that.
if st.get("registered"):
st["registered"] = False
continue
st["registered"] = True
founder = reg.get("founder")
if isinstance(founder, (bytes, bytearray)):
st["founder"] = bytes(founder)
# Simple scalar fields
for key in (
"topic",
"moderated",
"invite_only",
"topic_ops_only",
"no_outside_msgs",
"key",
"last_used_ts",
):
if key in reg:
st[key] = reg.get(key)
# Set fields
for key in ("ops", "voiced", "bans"):
v = reg.get(key)
if isinstance(v, set):
st[key] = set(v)
# Invites (dict[bytes, float])
inv = reg.get("invited")
if isinstance(inv, dict):
st["invited"] = dict(inv)
# Ensure founder stays op.
founder_st = st.get("founder")
if isinstance(founder_st, (bytes, bytearray)):
ops = st.setdefault("ops", set())
if isinstance(ops, set):
ops.add(bytes(founder_st))
self.room_manager.merge_registry_into_state(new_registry)
self._ensure_worker_threads()
@@ -939,7 +710,7 @@ class HubService:
self.log.exception("Failed to reconfigure logging")
cfg_changes = self._diff_config_summary(old_cfg, new_cfg)
room_changes = self._diff_room_registry_summary(old_registry, new_registry)
room_changes = self.room_manager.diff_registry_summary(old_registry, new_registry)
lines: list[str] = []
lines.append(
@@ -963,160 +734,20 @@ class HubService:
self._emit_notice(outgoing, link, room, "\n".join(lines))
def _room_registry_path_for_writes(self) -> str | None:
p = self.config.room_registry_path
if not p:
return
return expand_path(str(p))
def _load_registered_rooms_from_registry(self) -> None:
reg_path = self._room_registry_path_for_writes()
reg_path = self.room_manager.get_registry_path_for_writes()
if not reg_path:
return
registry, err = self._load_room_registry_from_path(reg_path)
registry, err = self.room_manager.load_registry_from_path(
reg_path, invite_timeout_s=self.config.room_invite_timeout_s
)
if err is not None:
return
self._room_registry = registry
def _room_state_get(self, room: str) -> dict[str, Any] | None:
return self._room_state.get(room)
def _room_state_ensure(
self, room: str, *, founder: bytes | None = None
) -> dict[str, Any]:
st = self._room_state.get(room)
if st is not None:
if st.get("founder") is None and founder is not None:
st["founder"] = founder
st.setdefault("ops", set()).add(founder)
return st
if room in self._room_registry:
base = self._room_registry[room]
invited = base.get("invited")
invited_dict: dict[bytes, float] = {}
if isinstance(invited, dict):
for k, v in invited.items():
if isinstance(k, (bytes, bytearray)):
try:
invited_dict[bytes(k)] = float(v)
except Exception:
continue
st = {
"founder": base.get("founder"),
"registered": True,
"topic": base.get("topic"),
"moderated": bool(base.get("moderated", False)),
"invite_only": bool(base.get("invite_only", False)),
"topic_ops_only": bool(base.get("topic_ops_only", False)),
"no_outside_msgs": bool(base.get("no_outside_msgs", False)),
"private": bool(base.get("private", False)),
"key": base.get("key"),
"ops": set(base.get("ops", set())),
"voiced": set(base.get("voiced", set())),
"bans": set(base.get("bans", set())),
"invited": invited_dict,
"last_used_ts": base.get("last_used_ts"),
}
self._room_state[room] = st
return st
st = {
"founder": founder,
"registered": False,
"topic": None,
"moderated": False,
"invite_only": False,
"topic_ops_only": False,
"no_outside_msgs": False,
"private": False,
"key": None,
"ops": set([founder]) if founder is not None else set(),
"voiced": set(),
"bans": set(),
"invited": {},
"last_used_ts": None,
}
self._room_state[room] = st
return st
def _prune_expired_invites(self, st: dict[str, Any]) -> bool:
inv = st.get("invited")
if not isinstance(inv, dict) or not inv:
return False
now = float(time.time())
removed_any = False
for h, exp in list(inv.items()):
try:
exp_f = float(exp)
except Exception:
exp_f = 0.0
if exp_f <= now:
inv.pop(h, None)
removed_any = True
return removed_any
def _is_invited(self, st: dict[str, Any], peer_hash: bytes) -> bool:
inv = st.get("invited")
if not isinstance(inv, dict) or not inv:
return False
now = float(time.time())
exp = inv.get(peer_hash)
try:
exp_f = float(exp) if exp is not None else 0.0
except Exception:
exp_f = 0.0
if exp_f <= now:
inv.pop(peer_hash, None)
return False
return True
def _touch_room(self, room: str) -> None:
try:
st = self._room_state_ensure(room)
ts = float(time.time())
st["last_used_ts"] = ts
reg = self._room_registry.get(room)
if isinstance(reg, dict):
reg["last_used_ts"] = ts
except Exception:
pass
self.room_manager._room_registry = registry
def _is_server_op(self, peer_hash: bytes | None) -> bool:
return self._is_trusted(peer_hash)
def _is_room_op(self, room: str, peer_hash: bytes | None) -> bool:
if peer_hash is None:
return False
if self._is_server_op(peer_hash):
return True
st = self._room_state_ensure(room)
founder = st.get("founder")
if isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash:
return True
ops = st.get("ops")
return isinstance(ops, set) and peer_hash in ops
def _is_room_voiced(self, room: str, peer_hash: bytes | None) -> bool:
if peer_hash is None:
return False
if self._is_room_op(room, peer_hash):
return True
st = self._room_state_ensure(room)
voiced = st.get("voiced")
return isinstance(voiced, set) and peer_hash in voiced
def _is_room_banned(self, room: str, peer_hash: bytes | None) -> bool:
if peer_hash is None:
return False
st = self._room_state_ensure(room)
bans = st.get("bans")
return isinstance(bans, set) and peer_hash in bans
def _room_moderated(self, room: str) -> bool:
st = self._room_state_ensure(room)
return bool(st.get("moderated", False))
def _resolve_identity_hash(
self, token: str, *, room: str | None = None
) -> bytes | None:
@@ -1160,211 +791,6 @@ class HubService:
except Exception:
return (None, [])
def _persist_room_state_to_registry(self, link: RNS.Link, room: str | None) -> None:
if room is None:
return
reg_path = self._room_registry_path_for_writes()
if not reg_path:
return
st = self._room_state_get(room)
if not st or not st.get("registered"):
return
try:
from tomlkit import dumps, parse, table # type: ignore
except Exception:
return
try:
with self._room_registry_write_lock:
file_stat = None
try:
file_stat = os.stat(reg_path)
except Exception:
file_stat = None
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
rooms = doc.get("rooms")
if rooms is None:
rooms = table()
doc["rooms"] = rooms
room_tbl = rooms.get(room)
if room_tbl is None:
room_tbl = table()
rooms[room] = room_tbl
founder = st.get("founder")
if isinstance(founder, (bytes, bytearray)):
room_tbl["founder"] = bytes(founder).hex()
topic = st.get("topic")
if isinstance(topic, str) and topic.strip():
room_tbl["topic"] = topic
else:
if "topic" in room_tbl:
del room_tbl["topic"]
room_tbl["moderated"] = bool(st.get("moderated", False))
room_tbl["invite_only"] = bool(st.get("invite_only", False))
room_tbl["topic_ops_only"] = bool(st.get("topic_ops_only", False))
room_tbl["no_outside_msgs"] = bool(st.get("no_outside_msgs", False))
key = st.get("key")
if isinstance(key, str) and key:
room_tbl["key"] = key
else:
if "key" in room_tbl:
del room_tbl["key"]
last_used_ts = st.get("last_used_ts")
if last_used_ts is None:
last_used_ts = float(time.time())
try:
room_tbl["last_used_ts"] = float(last_used_ts)
except Exception:
room_tbl["last_used_ts"] = float(time.time())
ops = st.get("ops")
if isinstance(ops, set):
room_tbl["operators"] = sorted(
bytes(x).hex() for x in ops if isinstance(x, (bytes, bytearray))
)
voiced = st.get("voiced")
if isinstance(voiced, set):
room_tbl["voiced"] = sorted(
bytes(x).hex()
for x in voiced
if isinstance(x, (bytes, bytearray))
)
bans = st.get("bans")
if isinstance(bans, set):
room_tbl["bans"] = sorted(
bytes(x).hex()
for x in bans
if isinstance(x, (bytes, bytearray))
)
invited = st.get("invited")
if isinstance(invited, dict):
inv_tbl = {}
now = float(time.time())
for h, exp in invited.items():
if not isinstance(h, (bytes, bytearray)):
continue
try:
exp_f = float(exp)
except Exception:
continue
if exp_f > now:
inv_tbl[bytes(h).hex()] = exp_f
room_tbl["invited"] = inv_tbl
new_text = dumps(doc)
with open(reg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if file_stat is not None:
try:
os.chmod(reg_path, file_stat.st_mode)
except Exception:
pass
except Exception as e:
self._notice_to(link, room, f"room config persist failed: {e}")
def _delete_room_from_registry(self, link: RNS.Link, room: str) -> None:
reg_path = self._room_registry_path_for_writes()
if not reg_path:
return
try:
from tomlkit import dumps, parse # type: ignore
except Exception:
return
try:
with self._room_registry_write_lock:
file_stat = None
try:
file_stat = os.stat(reg_path)
except Exception:
file_stat = None
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
rooms = doc.get("rooms")
if isinstance(rooms, dict) and room in rooms:
try:
del rooms[room]
except Exception:
rooms.pop(room, None)
new_text = dumps(doc)
with open(reg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if file_stat is not None:
try:
os.chmod(reg_path, file_stat.st_mode)
except Exception:
pass
except Exception as e:
self._notice_to(link, room, f"room unregister persist failed: {e}")
def _prune_loop(self) -> None:
while not self._shutdown.is_set():
interval = float(self.config.room_registry_prune_interval_s)
prune_after = float(self.config.room_registry_prune_after_s)
if interval <= 0 or prune_after <= 0:
time.sleep(1.0)
continue
time.sleep(interval)
if self._shutdown.is_set():
break
now = float(time.time())
rooms_to_prune: list[str] = []
dummy_link: RNS.Link | None = None
with self._state_lock:
dummy_link = next(iter(self.session_manager.sessions.keys()), None)
for room, reg in list(self._room_registry.items()):
# Skip active rooms.
if room in self.rooms and self.rooms.get(room):
continue
last_used = reg.get("last_used_ts")
try:
last_used = float(last_used) if last_used is not None else None
except Exception:
last_used = None
if last_used is None:
# Never-used rooms are eligible after prune_after from process start.
last_used = self._started_wall_time or now
if (now - float(last_used)) < prune_after:
continue
# Prune in-memory under lock.
self._room_registry.pop(room, None)
self._room_state.pop(room, None)
rooms_to_prune.append(room)
if dummy_link is not None:
for room in rooms_to_prune:
self._delete_room_from_registry(dummy_link, room)
for room in rooms_to_prune:
self.log.info("Pruned unused registered room %s", room)
def _resource_cleanup_loop(self) -> None:
"""Periodically cleanup expired resource expectations."""
while not self._shutdown.is_set():
@@ -1377,6 +803,35 @@ class HubService:
except Exception:
self.log.exception("Resource cleanup failed")
def _prune_loop(self) -> None:
"""Periodically prune unused registered rooms."""
while not self._shutdown.is_set():
interval = float(self.config.room_registry_prune_interval_s)
prune_after = float(self.config.room_registry_prune_after_s)
if interval <= 0 or prune_after <= 0:
time.sleep(1.0)
continue
time.sleep(interval)
if self._shutdown.is_set():
break
rooms_to_prune: list[str] = []
dummy_link: RNS.Link | None = None
with self._state_lock:
dummy_link = next(iter(self.session_manager.sessions.keys()), None)
rooms_to_prune = self.room_manager.prune_unused_registered_rooms(
prune_after, self._started_wall_time or time.time()
)
if dummy_link is not None:
for room in rooms_to_prune:
self.room_manager.delete_room_from_registry(dummy_link, room)
for room in rooms_to_prune:
self.log.info("Pruned unused registered room %s", room)
def _config_path_for_writes(self) -> str | None:
p = self.config.config_path
if not p:
@@ -1519,13 +974,10 @@ class HubService:
sessions_welcomed = session_stats["welcomed"]
sessions_identified = session_stats["identified"]
rooms_total = len(self.rooms)
memberships = sum(len(v) for v in self.rooms.values())
top_rooms = sorted(
((room, len(links)) for room, links in self.rooms.items()),
key=lambda x: (-x[1], x[0]),
)[:5]
room_stats = self.room_manager.get_stats()
rooms_total = room_stats["rooms_total"]
memberships = room_stats["memberships"]
top_rooms = room_stats["top_rooms"]
trusted_count = len(self._trusted)
banned_count = len(self._banned)
@@ -1652,6 +1104,8 @@ class HubService:
peer_hash=sess.get("peer"),
motd=self.config.greeting,
)
# Send queued WELCOME first
for out_link, payload in outgoing:
self._inc("bytes_out", len(payload))
try:
@@ -1670,6 +1124,21 @@ class HubService:
len(payload),
exc_info=True,
)
# Now send MOTD via resource or chunks (after WELCOME is sent)
if self.config.greeting:
self.log.debug(
"Sending MOTD link_id=%s len=%s",
self._fmt_link_id(link),
len(self.config.greeting),
)
self._send_text_smart(
link,
msg_type=T_NOTICE,
text=self.config.greeting,
room=None,
kind=RES_KIND_MOTD,
)
def _on_close(self, link: RNS.Link) -> None:
peer = None
@@ -1731,7 +1200,7 @@ class HubService:
# Packet callbacks can occur concurrently with other link callbacks and
# background worker threads. Keep state mutations under the shared lock,
# but avoid holding the lock while sending packets via RNS.
outgoing: list[tuple[RNS.Link, bytes]] = []
outgoing: list[tuple[RNS.Link, bytes]] = OutgoingList()
with self._state_lock:
self._on_packet_locked(link, data, outgoing)
@@ -1760,6 +1229,14 @@ class HubService:
len(payload),
exc_info=True,
)
# Execute any post-send callbacks (e.g., for MOTD after WELCOME)
if hasattr(outgoing, '_post_send_callbacks'):
for callback in outgoing._post_send_callbacks: # type: ignore
try:
callback()
except Exception:
self.log.exception("Post-send callback failed")
def _on_packet_locked(
self,
+1 -6
View File
@@ -130,12 +130,7 @@ class SessionManager:
# Clean up room memberships
for room in list(sess["rooms"]):
self.hub.rooms.get(room, set()).discard(link)
if room in self.hub.rooms and not self.hub.rooms[room]:
self.hub.rooms.pop(room, None)
st = self.hub._room_state_get(room)
if st is not None and not st.get("registered"):
self.hub._room_state.pop(room, None)
self.hub.room_manager.remove_member(room, link)
return peer, nick, rooms_count