diff --git a/rrcd/commands.py b/rrcd/commands.py index c08b45a..13747ad 100644 --- a/rrcd/commands.py +++ b/rrcd/commands.py @@ -72,7 +72,7 @@ class CommandHandler: ) return True # Send response without room field for hub-level command - self._emit_notice(outgoing, link, None, self.hub.stats_manager.format_stats()) + self.hub.message_helper.emit_notice(outgoing, link, None, self.hub.stats_manager.format_stats()) return True if cmd == "list": @@ -92,7 +92,7 @@ class CommandHandler: registered_rooms.append((room_name, topic)) if not registered_rooms: - self._emit_notice(outgoing, link, None, "No public rooms registered") + self.hub.message_helper.emit_notice(outgoing, link, None, "No public rooms registered") return True # Sort rooms alphabetically @@ -106,7 +106,7 @@ class CommandHandler: else: lines.append(f" {room_name}") - self._emit_notice(outgoing, link, None, "\n".join(lines)) + self.hub.message_helper.emit_notice(outgoing, link, None, "\n".join(lines)) return True if cmd in ("who", "names"): @@ -114,19 +114,19 @@ class CommandHandler: if len(parts) >= 2: target_room = parts[1] if not isinstance(target_room, str) or not target_room: - self._emit_notice(outgoing, link, None, "usage: /who [room]") + self.hub.message_helper.emit_notice(outgoing, link, None, "usage: /who [room]") return True try: r = self.hub._norm_room(target_room) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True # Check if room is private - only server operators can see private rooms st = self.hub.room_manager._room_state_get(r) if st and st.get("private"): if not self.hub.trust_manager.is_server_op(peer_hash): - self._emit_notice(outgoing, link, None, f"room {r} is private") + self.hub.message_helper.emit_notice(outgoing, link, None, f"room {r} is private") return True members = [] @@ -142,7 +142,7 @@ class CommandHandler: else: members.append(ident) # Send response without room field for hub-level query - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -152,7 +152,7 @@ class CommandHandler: if cmd == "kick": if len(parts) < 3: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, "usage: /kick " ) return True @@ -161,7 +161,7 @@ class CommandHandler: try: r = self.hub._norm_room(target_room) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"bad room: {e}") return True if not self.hub.room_manager.is_room_op(r, peer_hash): @@ -179,14 +179,14 @@ class CommandHandler: if target_link is None: # Check if ambiguous or just not found all_matches = self._find_target_links(target, room=r) - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, self._format_ambiguous_targets(target, all_matches) ) return True tsess = self.hub.session_manager.sessions.get(target_link) if not tsess or r not in tsess.get("rooms", set()): - self._emit_notice(outgoing, link, room, "target not in room") + self.hub.message_helper.emit_notice(outgoing, link, room, "target not in room") return True tsess["rooms"].discard(r) @@ -203,7 +203,7 @@ class CommandHandler: text=f"kicked from {r}", room=r, ) - self._emit_notice(outgoing, link, room, f"kicked {target} from {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"kicked {target} from {r}") return True if cmd == "kline": @@ -220,7 +220,7 @@ class CommandHandler: # Hub-level command - all responses without room field if len(parts) < 2: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -232,7 +232,7 @@ class CommandHandler: if op == "list": with self.hub._state_lock: items = sorted(h.hex() for h in self.hub.trust_manager._banned) - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -241,7 +241,7 @@ class CommandHandler: return True if op not in ("add", "del"): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -250,7 +250,7 @@ class CommandHandler: return True if len(parts) < 3: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, f"usage: /kline {op} " ) return True @@ -268,14 +268,14 @@ class CommandHandler: target_link.teardown() except Exception: pass - self._emit_notice(outgoing, link, None, f"kline added for {target}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"kline added for {target}") return True # Not found as active link - check if ambiguous or try as raw hash all_matches = self._find_target_links(target, room=None) if all_matches: # Ambiguous - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, self._format_ambiguous_targets(target, all_matches) ) return True @@ -284,37 +284,37 @@ class CommandHandler: try: h = self.hub._parse_identity_hash(target) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad identity hash: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad identity hash: {e}") return True self.hub.trust_manager.add_ban(h) self.hub.trust_manager.persist_banned_identities_to_config(link, None, outgoing) - self._emit_notice(outgoing, link, None, f"kline added for {h.hex()}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"kline added for {h.hex()}") return True # op == "del" try: h = self.hub._parse_identity_hash(target) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad identity hash: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad identity hash: {e}") return True if self.hub.trust_manager.is_banned(h): self.hub.trust_manager.remove_ban(h) self.hub.trust_manager.persist_banned_identities_to_config(link, None, outgoing) - self._emit_notice(outgoing, link, None, f"kline removed for {h.hex()}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"kline removed for {h.hex()}") else: - self._emit_notice(outgoing, link, None, f"not klined: {h.hex()}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"not klined: {h.hex()}") return True # Room-scoped moderation and maintenance if cmd == "register": if len(parts) < 2: - self._emit_notice(outgoing, link, None, "usage: /register ") + self.hub.message_helper.emit_notice(outgoing, link, None, "usage: /register ") return True try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True # Registration rules: requester must be in the room and must be the founder. # (No server-op override by design.) @@ -323,7 +323,7 @@ class CommandHandler: or self.hub._norm_room(room) != r or r not in self.hub.session_manager.sessions.get(link, {}).get("rooms", set()) ): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "must be present in the room to register it" ) return True @@ -348,7 +348,7 @@ class CommandHandler: return True if not self.hub.room_manager.get_registry_path_for_writes(): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "cannot register room: no room_registry_path" ) return True @@ -381,17 +381,17 @@ class CommandHandler: } self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"registered room {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"registered room {r}") return True if cmd == "unregister": if len(parts) < 2: - self._emit_notice(outgoing, link, None, "usage: /unregister ") + self.hub.message_helper.emit_notice(outgoing, link, None, "usage: /unregister ") return True try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True if ( @@ -399,7 +399,7 @@ class CommandHandler: or self.hub._norm_room(room) != r or r not in self.hub.session_manager.sessions.get(link, {}).get("rooms", set()) ): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "must be present in the room to unregister it" ) return True @@ -420,7 +420,7 @@ class CommandHandler: return True if not st.get("registered"): - self._emit_notice(outgoing, link, room, f"room {r} is not registered") + self.hub.message_helper.emit_notice(outgoing, link, room, f"room {r} is not registered") return True st["registered"] = False @@ -429,22 +429,22 @@ class CommandHandler: # Drop state if empty. if not self.hub.room_manager.get_room_members(r) or not self.hub.room_manager.get_room_members(r): self.hub.room_manager._room_state.pop(r, None) - self._emit_notice(outgoing, link, room, f"unregistered room {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"unregistered room {r}") return True if cmd == "topic": if len(parts) < 2: - self._emit_notice(outgoing, link, None, "usage: /topic [topic]") + self.hub.message_helper.emit_notice(outgoing, link, None, "usage: /topic [topic]") return True try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True st = self.hub.room_manager._room_state_ensure(r) if len(parts) == 2: topic = st.get("topic") - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -471,7 +471,7 @@ class CommandHandler: self.hub.room_manager.persist_room_state(link, r) # Broadcast topic change to current members. for other in list(self.hub.room_manager.get_room_members(r)): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, other, r, @@ -481,14 +481,14 @@ class CommandHandler: if cmd in ("op", "deop", "voice", "devoice"): if len(parts) < 3: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, f"usage: /{cmd} " ) return True try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: @@ -503,7 +503,7 @@ class CommandHandler: target_hash, all_matches = self.hub._resolve_identity_hash_with_matches(parts[2], room=r) if target_hash is None: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, self._format_ambiguous_targets(parts[2], all_matches) ) return True @@ -523,16 +523,16 @@ class CommandHandler: ops.add(target_hash) self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"op granted in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"op granted in {r}") return True else: if founder_b is not None and target_hash == founder_b: - self._emit_notice(outgoing, link, room, "cannot deop founder") + self.hub.message_helper.emit_notice(outgoing, link, room, "cannot deop founder") return True ops.discard(target_hash) self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"op removed in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"op removed in {r}") return True voiced = st.setdefault("voiced", set()) @@ -543,18 +543,18 @@ class CommandHandler: voiced.add(target_hash) self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"voice granted in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"voice granted in {r}") return True else: voiced.discard(target_hash) self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"voice removed in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"voice removed in {r}") return True if cmd == "mode": if len(parts) < 3: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -564,7 +564,7 @@ class CommandHandler: try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True if not self.hub.room_manager.is_room_op(r, peer_hash): if self.hub.identity is not None: @@ -617,13 +617,13 @@ class CommandHandler: if flag in ("+k", "-k"): if flag == "+k": if len(parts) < 4: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "usage: /mode +k " ) return True key = " ".join(parts[3:]).strip() if not key: - self._emit_notice(outgoing, link, room, "key must not be empty") + self.hub.message_helper.emit_notice(outgoing, link, room, "key must not be empty") return True st["key"] = key else: @@ -634,14 +634,14 @@ class CommandHandler: return True if flag in ("+r", "-r"): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "use /register or /unregister to change +r" ) return True if flag in ("+o", "-o", "+v", "-v"): if len(parts) < 4: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -651,7 +651,7 @@ class CommandHandler: target_hash, all_matches = self.hub._resolve_identity_hash_with_matches(parts[3], room=r) if target_hash is None: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches) ) return True @@ -671,7 +671,7 @@ class CommandHandler: ops.add(target_hash) else: if founder_b is not None and target_hash == founder_b: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "cannot deop founder" ) return True @@ -680,7 +680,7 @@ class CommandHandler: self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) for other in list(self.hub.room_manager.get_room_members(r)): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, other, r, @@ -700,7 +700,7 @@ class CommandHandler: self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) for other in list(self.hub.room_manager.get_room_members(r)): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, other, r, @@ -708,7 +708,7 @@ class CommandHandler: ) return True - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -718,7 +718,7 @@ class CommandHandler: if cmd == "ban": if len(parts) < 3: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -729,7 +729,7 @@ class CommandHandler: try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True op = parts[2].strip().lower() @@ -737,18 +737,18 @@ class CommandHandler: st = self.hub.room_manager._room_state_ensure(r) bans = st.get("bans") if not isinstance(bans, set) or not bans: - self._emit_notice(outgoing, link, room, f"no bans in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"no bans in {r}") return True items = sorted( bytes(x).hex() for x in bans if isinstance(x, (bytes, bytearray)) ) - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, f"bans in {r}: " + ", ".join(items) ) return True if op not in ("add", "del"): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -757,7 +757,7 @@ class CommandHandler: return True if len(parts) < 4: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, f"usage: /ban {r} {op} " ) return True @@ -775,7 +775,7 @@ class CommandHandler: target_hash, all_matches = self.hub._resolve_identity_hash_with_matches(parts[3], room=r) if target_hash is None: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches) ) return True @@ -808,18 +808,18 @@ class CommandHandler: ) if self.hub.room_manager.get_room_members(r) and not self.hub.room_manager.rooms[r]: pass # room cleanup handled by room_manager - self._emit_notice(outgoing, link, room, f"ban added in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"ban added in {r}") return True bans.discard(target_hash) self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"ban removed in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"ban removed in {r}") return True if cmd == "invite": if len(parts) < 3: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, None, @@ -830,7 +830,7 @@ class CommandHandler: try: r = self.hub._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, None, f"bad room: {e}") + self.hub.message_helper.emit_notice(outgoing, link, None, f"bad room: {e}") return True if not self.hub.room_manager.is_room_op(r, peer_hash): @@ -872,7 +872,7 @@ class CommandHandler: if pruned: self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -881,7 +881,7 @@ class CommandHandler: return True if op not in ("add", "del"): - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -890,7 +890,7 @@ class CommandHandler: return True if len(parts) < 4: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -934,14 +934,14 @@ class CommandHandler: is_invite_only = bool(st.get("invite_only", False)) if is_keyed: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, target_link, r, f"You have been invited to join {r}. This invite allows joining without the key (+k).", ) else: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, target_link, r, f"You have been invited to join {r}." ) @@ -958,21 +958,21 @@ class CommandHandler: invited[target_hash] = exp self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, f"invite added in {r} (expires in {int(ttl)}s)", ) else: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, f"invite sent to {token} for {r}" ) return True target_hash, all_matches = self.hub._resolve_identity_hash_with_matches(parts[3], room=None) if target_hash is None: - self._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches) ) return True @@ -981,7 +981,7 @@ class CommandHandler: invited.pop(target_hash, None) self.hub.room_manager.touch_room(r) self.hub.room_manager.persist_room_state(link, r) - self._emit_notice(outgoing, link, room, f"invite removed in {r}") + self.hub.message_helper.emit_notice(outgoing, link, room, f"invite removed in {r}") return True return False diff --git a/rrcd/messages.py b/rrcd/messages.py new file mode 100644 index 0000000..9b16124 --- /dev/null +++ b/rrcd/messages.py @@ -0,0 +1,300 @@ +"""Message sending and queueing utilities for the RRC hub.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import RNS + +from .codec import encode +from .constants import T_ERROR, T_NOTICE, T_WELCOME, B_WELCOME_HUB, B_WELCOME_VER +from .envelope import make_envelope + +if TYPE_CHECKING: + from .service import HubService + + +class MessageHelper: + """ + Helper methods for sending and queueing messages. + + Handles: + - Message queueing (outgoing lists) + - Notice chunking for large messages + - WELCOME message construction + - Error and notice emission + - Smart text sending (resource vs chunks) + """ + + def __init__(self, hub: HubService) -> None: + self.hub = hub + self.log = hub.log + + def packet_would_fit(self, link: RNS.Link, payload: bytes) -> bool: + """Check if payload fits within link MDU without creating/packing packets.""" + try: + # Query link MDU directly if available (more efficient than packing) + if hasattr(link, 'MDU') and link.MDU is not None: + return len(payload) <= link.MDU + # Fall back to packet creation if MDU not available + pkt = RNS.Packet(link, payload) + pkt.pack() + return True + except Exception: + return False + + def queue_payload( + self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, payload: bytes + ) -> None: + """Add a raw payload to the outgoing queue.""" + self.hub.stats_manager.inc("bytes_out", len(payload)) + outgoing.append((link, payload)) + + def queue_env( + self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, env: dict + ) -> None: + """Encode and queue an envelope.""" + payload = encode(env) + self.queue_payload(outgoing, link, payload) + + def queue_notice_chunks( + self, + outgoing: list[tuple[RNS.Link, bytes]], + link: RNS.Link, + *, + room: str | None, + text: str, + ) -> None: + """Split and queue a notice message into MTU-sized chunks.""" + if self.hub.identity is None: + return + if not text: + return + + # Prefer splitting on lines for readability. If a single line is too + # large, further split it by characters using a pack preflight. + lines = text.splitlines() or [text] + for line in lines: + remaining = line + if not remaining: + continue + + # Start with a generous chunk size; shrink on demand. + max_chars = min(len(remaining), 512) + while remaining: + take = min(len(remaining), max_chars) + chunk = remaining[:take] + env = make_envelope( + T_NOTICE, + src=self.hub.identity.hash, + room=room, + body=chunk, + ) + payload = encode(env) + if self.packet_would_fit(link, payload): + self.queue_payload(outgoing, link, payload) + remaining = remaining[take:] + max_chars = min(max_chars, 512) + continue + + if max_chars <= 1: + # Nothing we can do; avoid an infinite loop. + self.log.warning( + "NOTICE chunk would not fit MTU; dropping remainder (%s chars)", + len(remaining), + ) + break + + max_chars = max(1, max_chars // 2) + + def queue_welcome( + self, + outgoing: list[tuple[RNS.Link, bytes]], + link: RNS.Link, + *, + peer_hash: Any, + motd: str | None, + ) -> None: + """Queue a WELCOME message for a newly connected peer.""" + if self.hub.identity is None: + return + + from . import __version__ + + g = str(motd) if motd else "" + body_w: dict[int, Any] = { + B_WELCOME_HUB: self.hub.config.hub_name, + B_WELCOME_VER: str(__version__), + } + # Capabilities are optional; keep WELCOME minimal unless needed. + + welcome = make_envelope(T_WELCOME, src=self.hub.identity.hash, body=body_w) + welcome_payload = encode(welcome) + + if not self.packet_would_fit(link, welcome_payload): + self.log.warning( + "WELCOME would not fit MTU; cannot welcome peer=%s link_id=%s", + self.hub._fmt_hash(peer_hash), + self.hub._fmt_link_id(link), + ) + return + + self.queue_payload(outgoing, link, welcome_payload) + self.log.debug( + "Queued WELCOME peer=%s link_id=%s", + self.hub._fmt_hash(peer_hash), + self.hub._fmt_link_id(link), + ) + + def send_text_smart( + self, + link: RNS.Link, + *, + msg_type: int, + text: str, + room: str | None = None, + kind: str | None = None, + outgoing: list[tuple[RNS.Link, bytes]] | None = None, + encoding: str = "utf-8", + ) -> None: + """ + Send text message using the most efficient method: + - Resource transfer for large messages (if enabled and outgoing is None) + - Chunked messages otherwise + """ + from .constants import RES_KIND_MOTD, RES_KIND_NOTICE + + # Determine resource kind if not specified + resource_kind = kind + if resource_kind is None: + resource_kind = RES_KIND_MOTD if msg_type == T_NOTICE and room is None else RES_KIND_NOTICE + + # Try resource transfer if enabled, outgoing is None, and message is large enough + if ( + self.hub.config.enable_resource_transfer + and outgoing is None + and len(text.encode(encoding, errors="replace")) > 512 + ): + self.log.debug( + "Attempting resource transfer link_id=%s kind=%s chars=%s", + self.hub._fmt_link_id(link), + resource_kind, + len(text), + ) + if self.hub.resource_manager.send_via_resource( + link, + kind=resource_kind, + payload=text.encode(encoding, errors="replace"), + room=room, + encoding=encoding, + ): + self.log.debug( + "Sent large text via resource link_id=%s kind=%s chars=%s", + self.hub._fmt_link_id(link), + resource_kind, + len(text), + ) + return + else: + self.log.warning( + "Resource send failed, falling back to chunks link_id=%s", + self.hub._fmt_link_id(link), + ) + + # Fall back to chunking for NOTICE + if msg_type == T_NOTICE: + self.log.debug( + "Falling back to chunking link_id=%s outgoing_is_none=%s", + self.hub._fmt_link_id(link), + outgoing is None, + ) + if outgoing is None: + outgoing = [] + self.queue_notice_chunks(outgoing, link, room=room, text=text) + for out_link, chunk_payload in outgoing: + self.hub.stats_manager.inc("bytes_out", len(chunk_payload)) + try: + RNS.Packet(out_link, chunk_payload).send() + except Exception as e: + self.log.warning( + "Failed to send chunk link_id=%s: %s", + self.hub._fmt_link_id(out_link), + e, + ) + else: + self.queue_notice_chunks(outgoing, link, room=room, text=text) + else: + # For other message types, just drop or log error + self.log.error( + "Message too large and not NOTICE link_id=%s type=%s", + self.hub._fmt_link_id(link), + msg_type, + ) + + def emit_notice( + self, + outgoing: list[tuple[RNS.Link, bytes]] | None, + link: RNS.Link, + room: str | None, + text: str, + ) -> None: + """Emit a notice message (queued or immediate).""" + if self.hub.identity is None: + return + env = make_envelope(T_NOTICE, src=self.hub.identity.hash, room=room, body=text) + if outgoing is None: + self.send(link, env) + else: + self.queue_env(outgoing, link, env) + + def emit_error( + self, + outgoing: list[tuple[RNS.Link, bytes]] | None, + link: RNS.Link, + *, + src: bytes, + text: str, + room: str | None = None, + ) -> None: + """Emit an error message (queued or immediate).""" + self.hub.stats_manager.inc("errors_sent") + env = make_envelope(T_ERROR, src=src, room=room, body=text) + if outgoing is None: + self.send(link, env) + else: + self.queue_env(outgoing, link, env) + + def notice_to(self, link: RNS.Link, room: str | None, text: str) -> None: + """Send a notice message immediately.""" + if self.hub.identity is None: + return + env = make_envelope(T_NOTICE, src=self.hub.identity.hash, room=room, body=text) + self.send(link, env) + + def error( + self, link: RNS.Link, src: bytes, text: str, room: str | None = None + ) -> None: + """Send an error message immediately.""" + self.emit_error(None, link, src=src, text=text, room=room) + + def send(self, link: RNS.Link, env: dict) -> None: + """Send an envelope immediately (not queued).""" + payload = encode(env) + self.hub.stats_manager.inc("bytes_out", len(payload)) + try: + RNS.Packet(link, payload).send() + except OSError as e: + # Common failure mode on low-MTU links: packet too large. + self.log.warning( + "Send failed link_id=%s bytes=%s err=%s", + self.hub._fmt_link_id(link), + len(payload), + e, + ) + except Exception: + self.log.debug( + "Send failed link_id=%s bytes=%s", + self.hub._fmt_link_id(link), + len(payload), + exc_info=True, + ) diff --git a/rrcd/rooms.py b/rrcd/rooms.py index f27401c..8654fc2 100644 --- a/rrcd/rooms.py +++ b/rrcd/rooms.py @@ -225,7 +225,7 @@ class RoomManager: mode_txt = self.get_room_mode_string(room) recipients = list(self.get_room_members(room)) for other in recipients: - self.hub._emit_notice( + self.hub.message_helper.emit_notice( outgoing, other, room, f"mode for {room} is now: {mode_txt}" ) @@ -567,7 +567,7 @@ class RoomManager: except Exception: pass except Exception as e: - self.hub._notice_to(link, room, f"room config persist failed: {e}") + self.hub.message_helper.notice_to(link, room, f"room config persist failed: {e}") def delete_room_from_registry(self, link: RNS.Link, room: str) -> None: """Remove a room from the registry TOML file.""" @@ -607,7 +607,7 @@ class RoomManager: except Exception: pass except Exception as e: - self.hub._notice_to(link, room, f"room unregister persist failed: {e}") + self.hub.message_helper.notice_to(link, room, f"room unregister persist failed: {e}") def prune_unused_registered_rooms( self, prune_after_s: float, started_wall_time: float diff --git a/rrcd/router.py b/rrcd/router.py index 2f19534..72fb343 100644 --- a/rrcd/router.py +++ b/rrcd/router.py @@ -96,7 +96,7 @@ class MessageRouter: self.hub._fmt_link_id(link), ) if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text="rate limited" ) return @@ -114,7 +114,7 @@ class MessageRouter: e, ) if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text=f"bad message: {e}" ) return @@ -176,7 +176,7 @@ class MessageRouter: if not self.hub.config.enable_resource_transfer: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -187,7 +187,7 @@ class MessageRouter: if not isinstance(body, dict): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -205,7 +205,7 @@ class MessageRouter: # Validate required fields if not isinstance(rid, (bytes, bytearray)): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -216,7 +216,7 @@ class MessageRouter: if not isinstance(kind, str) or not kind: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -227,7 +227,7 @@ class MessageRouter: if not isinstance(size, int) or size < 0: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -239,7 +239,7 @@ class MessageRouter: # Check size limit if size > self.hub.config.max_resource_bytes: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -251,7 +251,7 @@ class MessageRouter: # Validate optional fields if sha256 is not None and not isinstance(sha256, (bytes, bytearray)): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -274,7 +274,7 @@ class MessageRouter: room=room, ): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -297,7 +297,7 @@ class MessageRouter: if t != T_HELLO: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text="send HELLO first" ) return @@ -347,7 +347,7 @@ class MessageRouter: # The outgoing queue will be sent first, then this callback will send the MOTD if self.hub.config.greeting: def send_motd(): - self.hub._send_text_smart( + self.hub.message_helper.send_text_smart( link, msg_type=T_NOTICE, text=self.hub.config.greeting, @@ -440,7 +440,7 @@ class MessageRouter: self.hub.stats_manager.inc("joins") if not isinstance(room, str) or not room: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -450,7 +450,7 @@ class MessageRouter: if len(sess["rooms"]) >= int(self.hub.config.max_rooms_per_session): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text="too many rooms" ) return @@ -459,7 +459,7 @@ class MessageRouter: r = self.hub._norm_room(room) except Exception as e: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text=str(e) ) return @@ -475,7 +475,7 @@ class MessageRouter: is_invited = self.hub.room_manager.is_invited(r, peer_hash) if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -492,7 +492,7 @@ class MessageRouter: provided = body if isinstance(body, str) else None if provided != key: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -504,7 +504,7 @@ class MessageRouter: # Room bans are room-local and apply to JOIN. if self.hub.room_manager.is_room_banned(r, peer_hash): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -544,7 +544,7 @@ class MessageRouter: joined = make_envelope( T_JOINED, src=self.hub.identity.hash, room=r, body=joined_body ) - self.hub._queue_env(outgoing, link, joined) + self.hub.message_helper.queue_env(outgoing, link, joined) # Consume invite on successful join. try: @@ -562,7 +562,7 @@ class MessageRouter: mode_txt = self.hub.room_manager.get_room_mode_string(r) topic_txt = topic if topic else "(none)" reg_txt = "registered" if registered else "unregistered" - self.hub._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, r, @@ -585,7 +585,7 @@ class MessageRouter: self.hub.stats_manager.inc("parts") if not isinstance(room, str) or not room: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -597,7 +597,7 @@ class MessageRouter: r = self.hub._norm_room(room) except Exception as e: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text=str(e) ) return @@ -630,7 +630,7 @@ class MessageRouter: parted = make_envelope( T_PARTED, src=self.hub.identity.hash, room=r, body=parted_body ) - self.hub._queue_env(outgoing, link, parted) + self.hub.message_helper.queue_env(outgoing, link, parted) self.log.info( "PART peer=%s nick=%r room=%s link_id=%s", @@ -679,7 +679,7 @@ class MessageRouter: return # Unrecognized slash command - send error if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -693,7 +693,7 @@ class MessageRouter: if t == T_MSG: if not isinstance(room, str) or not room: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -709,7 +709,7 @@ class MessageRouter: r = self.hub._norm_room(room) except Exception as e: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, text=str(e) ) return @@ -725,7 +725,7 @@ class MessageRouter: if st is None: if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -736,7 +736,7 @@ class MessageRouter: if bool(st.get("no_outside_msgs", False)): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -748,7 +748,7 @@ class MessageRouter: # Per-room moderation: bans and moderated mode. if self.hub.room_manager.is_room_banned(r, peer_hash): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -758,7 +758,7 @@ class MessageRouter: return if self.hub.room_manager.is_room_moderated(r) and not self.hub.room_manager.is_room_voiced(r, peer_hash): if self.hub.identity is not None: - self.hub._emit_error( + self.hub.message_helper.emit_error( outgoing, link, src=self.hub.identity.hash, @@ -801,7 +801,7 @@ class MessageRouter: payload = encode(env) for other in list(self.hub.room_manager.get_room_members(r)): - self.hub._queue_payload(outgoing, other, payload) + self.hub.message_helper.queue_payload(outgoing, other, payload) if self.log.isEnabledFor(logging.DEBUG): self.log.debug( @@ -832,7 +832,7 @@ class MessageRouter: if self.hub.identity is not None: pong = make_envelope(T_PONG, src=self.hub.identity.hash, body=body) self.hub.stats_manager.inc("pongs_out") - self.hub._queue_env(outgoing, link, pong) + self.hub.message_helper.queue_env(outgoing, link, pong) def _extract_caps(self, body: Any) -> dict[int, Any]: """Extract capabilities from HELLO body.""" diff --git a/rrcd/service.py b/rrcd/service.py index 53a1e04..3ca0b83 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -26,6 +26,7 @@ from .constants import ( ) from .envelope import make_envelope from .logging_config import configure_logging +from .messages import MessageHelper from .resources import ResourceManager from .rooms import RoomManager from .router import MessageRouter, OutgoingList @@ -70,6 +71,9 @@ class HubService: # Config manager for configuration loading and reloading self.config_manager = ConfigManager(self) + + # Message helper for sending and queueing messages + self.message_helper = MessageHelper(self) self.identity: RNS.Identity | None = None self.destination: RNS.Destination | None = None @@ -97,109 +101,11 @@ class HubService: return bytes(h).hex() return "-" - def _packet_would_fit(self, link: RNS.Link, payload: bytes) -> bool: - """Check if payload fits within link MDU without creating/packing packets.""" - try: - # Query link MDU directly if available (more efficient than packing) - if hasattr(link, 'MDU') and link.MDU is not None: - return len(payload) <= link.MDU - # Fall back to packet creation if MDU not available - pkt = RNS.Packet(link, payload) - pkt.pack() - return True - except Exception: - return False - - def _queue_notice_chunks( - self, - outgoing: list[tuple[RNS.Link, bytes]], - link: RNS.Link, - *, - room: str | None, - text: str, - ) -> None: - if self.identity is None: - return - if not text: - return - - # Prefer splitting on lines for readability. If a single line is too - # large, further split it by characters using a pack preflight. - lines = text.splitlines() or [text] - for line in lines: - remaining = line - if not remaining: - continue - - # Start with a generous chunk size; shrink on demand. - max_chars = min(len(remaining), 512) - while remaining: - take = min(len(remaining), max_chars) - chunk = remaining[:take] - env = make_envelope( - T_NOTICE, - src=self.identity.hash, - room=room, - body=chunk, - ) - payload = encode(env) - if self._packet_would_fit(link, payload): - self._queue_payload(outgoing, link, payload) - remaining = remaining[take:] - max_chars = min(max_chars, 512) - continue - - if max_chars <= 1: - # Nothing we can do; avoid an infinite loop. - self.log.warning( - "NOTICE chunk would not fit MTU; dropping remainder (%s chars)", - len(remaining), - ) - break - - max_chars = max(1, max_chars // 2) - - def _queue_welcome( - self, - outgoing: list[tuple[RNS.Link, bytes]], - link: RNS.Link, - *, - peer_hash: Any, - motd: str | None, - ) -> None: - if self.identity is None: - return - - g = str(motd) if motd else "" - body_w: dict[int, Any] = { - B_WELCOME_HUB: self.config.hub_name, - B_WELCOME_VER: str(__version__), - } - # Capabilities are optional; keep WELCOME minimal unless needed. - - welcome = make_envelope(T_WELCOME, src=self.identity.hash, body=body_w) - welcome_payload = encode(welcome) - - if not self._packet_would_fit(link, welcome_payload): - self.log.warning( - "WELCOME would not fit MTU; cannot welcome peer=%s link_id=%s", - self._fmt_hash(peer_hash), - self._fmt_link_id(link), - ) - return - - self._queue_payload(outgoing, link, welcome_payload) - self.log.debug( - "Queued WELCOME peer=%s link_id=%s", - self._fmt_hash(peer_hash), - self._fmt_link_id(link), - ) - def _update_nick_index(self, link: RNS.Link, old_nick: str | None, new_nick: str | None) -> None: """Update nick index when a nick changes. Delegates to SessionManager.""" self.session_manager.update_nick_index(link, old_nick, new_nick) - # Resource transfer methods + # Resource transfer methods - delegates to message_helper for smart sending def _send_text_smart( self, @@ -212,115 +118,16 @@ class HubService: outgoing: list[tuple[RNS.Link, bytes]] | None = None, kind: str | None = None, ) -> None: - """ - Send text message using best method (packet or resource). - Falls back to chunking if resource transfer fails or is disabled. - - Args: - kind: Resource kind if sent via resource (default: RES_KIND_NOTICE) - """ - if self.identity is None: - return - - # Try encoding as a single packet first - env = make_envelope(msg_type, src=self.identity.hash, room=room, body=text) - payload = encode(env) - - # If it fits, send normally - if self._packet_would_fit(link, payload): - self.log.debug( - "Text fits in packet link_id=%s bytes=%s", - self._fmt_link_id(link), - len(payload), - ) - if outgoing is None: - self._send(link, env) - else: - self._queue_env(outgoing, link, env) - return - - self.log.debug( - "Text too large for packet link_id=%s bytes=%s mtu_check_failed=True", - self._fmt_link_id(link), - len(payload), + """Delegate to message_helper for smart text sending.""" + self.message_helper.send_text_smart( + link, + msg_type=msg_type, + text=text, + room=room, + kind=kind, + outgoing=outgoing, + encoding=encoding, ) - - # Too large for packet - try resource if enabled and type is NOTICE - # Only use resources when NOT batching (outgoing=None), since resource - # creation happens immediately and would race with queued packets. - text_bytes = text.encode(encoding) - can_use_resource = ( - self.config.enable_resource_transfer - and msg_type == T_NOTICE - and outgoing is None - and len(text_bytes) <= self.config.max_resource_bytes - ) - - self.log.debug( - "Resource check: enabled=%s type_is_notice=%s not_batching=%s size_ok=%s/%s", - self.config.enable_resource_transfer, - msg_type == T_NOTICE, - outgoing is None, - len(text_bytes), - self.config.max_resource_bytes, - ) - - if can_use_resource: - self.log.debug( - "Attempting to send via resource link_id=%s kind=%s", - self._fmt_link_id(link), - kind if kind is not None else RES_KIND_NOTICE, - ) - resource_kind = kind if kind is not None else RES_KIND_NOTICE - if self.resource_manager.send_via_resource( - link, - kind=resource_kind, - payload=text_bytes, - room=room, - encoding=encoding, - ): - self.log.debug( - "Sent large text via resource link_id=%s kind=%s chars=%s", - self._fmt_link_id(link), - resource_kind, - len(text), - ) - return - else: - self.log.warning( - "Resource send failed, falling back to chunks link_id=%s", - self._fmt_link_id(link), - ) - - # Fall back to chunking for NOTICE - if msg_type == T_NOTICE: - self.log.debug( - "Falling back to chunking link_id=%s outgoing_is_none=%s", - self._fmt_link_id(link), - outgoing is None, - ) - if outgoing is None: - outgoing = [] - self._queue_notice_chunks(outgoing, link, room=room, text=text) - for out_link, chunk_payload in outgoing: - self.stats_manager.inc("bytes_out", len(chunk_payload)) - try: - RNS.Packet(out_link, chunk_payload).send() - except Exception as e: - self.log.warning( - "Failed to send chunk link_id=%s: %s", - self._fmt_link_id(out_link), - e, - ) - else: - self._queue_notice_chunks(outgoing, link, room=room, text=text) - else: - # For other message types, just drop or log error - self.log.error( - "Message too large and not NOTICE link_id=%s type=%s", - self._fmt_link_id(link), - msg_type, - ) def start(self) -> None: self.log.info("Starting Reticulum") @@ -705,55 +512,6 @@ class HubService: for room in rooms_to_prune: self.log.info("Pruned unused registered room %s", room) - def _notice_to(self, link: RNS.Link, room: str | None, text: str) -> None: - if self.identity is None: - return - env = make_envelope(T_NOTICE, src=self.identity.hash, room=room, body=text) - self._send(link, env) - - def _queue_payload( - self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, payload: bytes - ) -> None: - self.stats_manager.inc("bytes_out", len(payload)) - outgoing.append((link, payload)) - - def _queue_env( - self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, env: dict - ) -> None: - payload = encode(env) - self._queue_payload(outgoing, link, payload) - - def _emit_notice( - self, - outgoing: list[tuple[RNS.Link, bytes]] | None, - link: RNS.Link, - room: str | None, - text: str, - ) -> None: - if self.identity is None: - return - env = make_envelope(T_NOTICE, src=self.identity.hash, room=room, body=text) - if outgoing is None: - self._send(link, env) - else: - self._queue_env(outgoing, link, env) - - def _emit_error( - self, - outgoing: list[tuple[RNS.Link, bytes]] | None, - link: RNS.Link, - *, - src: bytes, - text: str, - room: str | None = None, - ) -> None: - self.stats_manager.inc("errors_sent") - env = make_envelope(T_ERROR, src=src, room=room, body=text) - if outgoing is None: - self._send(link, env) - else: - self._queue_env(outgoing, link, env) - def _on_link(self, link: RNS.Link) -> None: with self._state_lock: self.session_manager.on_link_established(link) @@ -865,30 +623,47 @@ class HubService: ) def _send(self, link: RNS.Link, env: dict) -> None: - payload = encode(env) - self.stats_manager.inc("bytes_out", len(payload)) - try: - RNS.Packet(link, payload).send() - except OSError as e: - # Common failure mode on low-MTU links: packet too large. - self.log.warning( - "Send failed link_id=%s bytes=%s err=%s", - self._fmt_link_id(link), - len(payload), - e, - ) - except Exception: - self.log.debug( - "Send failed link_id=%s bytes=%s", - self._fmt_link_id(link), - len(payload), - exc_info=True, - ) + """Delegate to message_helper for sending.""" + self.message_helper.send(link, env) def _error( self, link: RNS.Link, src: bytes, text: str, room: str | None = None ) -> None: - self._emit_error(None, link, src=src, text=text, room=room) + """Delegate to message_helper for error sending.""" + self.message_helper.error(link, src, text, room) + + def _emit_error( + self, + outgoing: list[tuple[RNS.Link, bytes]] | None, + link: RNS.Link, + *, + src: bytes, + text: str, + room: str | None = None, + ) -> None: + """Delegate to message_helper for error emission.""" + self.message_helper.emit_error(outgoing, link, src=src, text=text, room=room) + + def _emit_notice( + self, + outgoing: list[tuple[RNS.Link, bytes]] | None, + link: RNS.Link, + room: str | None, + text: str, + ) -> None: + """Delegate to message_helper for notice emission.""" + self.message_helper.emit_notice(outgoing, link, room, text) + + def _queue_welcome( + self, + outgoing: list[tuple[RNS.Link, bytes]], + link: RNS.Link, + *, + peer_hash: Any, + motd: str | None, + ) -> None: + """Delegate to message_helper for queuing welcome.""" + self.message_helper.queue_welcome(outgoing, link, peer_hash=peer_hash, motd=motd) def _norm_room(self, room: str) -> str: r = room.strip().lower() diff --git a/rrcd/session.py b/rrcd/session.py index 7ad5813..a62b716 100644 --- a/rrcd/session.py +++ b/rrcd/session.py @@ -90,7 +90,7 @@ class SessionManager: self._index_by_hash[bytes(peer_hash)] = link # Check if banned - banned = bytes(peer_hash) in self.hub._banned + banned = self.hub.trust_manager.is_banned(bytes(peer_hash)) if not banned: self.log.info( diff --git a/rrcd/trust.py b/rrcd/trust.py index 1990c8b..c5f37ca 100644 --- a/rrcd/trust.py +++ b/rrcd/trust.py @@ -112,7 +112,7 @@ class TrustManager: """Persist the current banned identities list to the config file.""" cfg_path = self.hub.config_manager.get_config_path_for_writes() if not cfg_path: - self.hub._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, "ban updated (not persisted; no config_path)" ) return @@ -120,7 +120,7 @@ class TrustManager: try: from tomlkit import dumps, parse, table # type: ignore except Exception: - self.hub._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, @@ -171,6 +171,6 @@ class TrustManager: except Exception: pass except Exception as e: - self.hub._emit_notice( + self.hub.message_helper.emit_notice( outgoing, link, room, f"ban updated (persist failed: {e})" )