Merge pull request #11 from kc1awv/refactors

Refactors
This commit is contained in:
Steve Miller
2026-01-07 17:14:08 -05:00
committed by GitHub
17 changed files with 4425 additions and 3712 deletions
+19
View File
@@ -2,6 +2,25 @@
This project follows the versioning policy in VERSIONING.md.
## 0.2.0 - 2026-01-07
- **Major internal refactoring**: Improved code organization and maintainability
- Extracted modular components from monolithic service class:
- `SessionManager`: Centralized session lifecycle and state management
- `MessageRouter`: Message routing and forwarding logic
- `CommandHandler`: Slash-command parsing and execution
- `RoomManager`: Room state, membership, and mode management
- `ResourceManager`: RNS.Resource transfer handling and coordination
- `TrustManager`: Operator and ban list management
- `StatsManager`: Statistics tracking and reporting
- `ConfigManager`: Enhanced configuration loading and validation
- Moved message chunking and encoding logic to dedicated `messages` module
- Consolidated constants and improved code organization
- Reduced service.py from ~4000 lines to <600 lines by delegating to specialized managers
- No breaking changes to protocol, configuration format, or user-facing behavior
Future development will focus on testing, feature enhancements, and optimizations rather than large structural changes.
## 0.1.3 - 2026-01-05
- Added `/list` command to discover registered public rooms with their topics (available to all users)
+1 -1
View File
@@ -49,7 +49,7 @@ version = {attr = "rrcd.__version__"}
[tool.ruff]
target-version = "py311"
line-length = 88
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "B", "UP"]
+1 -1
View File
@@ -1,3 +1,3 @@
__all__ = ["__version__"]
__version__ = "0.1.3"
__version__ = "0.2.0"
+12 -5
View File
@@ -4,6 +4,7 @@ import argparse
import os
import sys
from dataclasses import asdict, replace
from pathlib import Path
import RNS
@@ -77,11 +78,11 @@ def _apply_config_file(cfg: HubRuntimeConfig, path: str) -> HubRuntimeConfig:
def _write_default_config(config_path: str, identity_path: str) -> None:
cfg_dir = os.path.dirname(config_path)
if cfg_dir:
ensure_private_dir(__import__("pathlib").Path(cfg_dir))
ensure_private_dir(Path(cfg_dir))
storage_dir = os.path.dirname(identity_path)
if storage_dir:
ensure_private_dir(__import__("pathlib").Path(storage_dir))
ensure_private_dir(Path(storage_dir))
room_registry_path = str(default_room_registry_path())
@@ -208,7 +209,7 @@ def _ensure_first_run_files(
if not os.path.exists(identity_path):
storage_dir = os.path.dirname(identity_path)
if storage_dir:
ensure_private_dir(__import__("pathlib").Path(storage_dir))
ensure_private_dir(Path(storage_dir))
ident = RNS.Identity()
ident.to_file(identity_path)
try:
@@ -220,7 +221,7 @@ def _ensure_first_run_files(
if room_registry_path and not os.path.exists(room_registry_path):
storage_dir = os.path.dirname(room_registry_path)
if storage_dir:
ensure_private_dir(__import__("pathlib").Path(storage_dir))
ensure_private_dir(Path(storage_dir))
content = """# rrcd room registry (TOML)
#
# This file stores registered rooms and their moderation state.
@@ -382,8 +383,14 @@ def main(argv: list[str] | None = None) -> None:
cfg = replace(cfg, config_path=config_path)
cfg = replace(cfg, room_registry_path=room_registry_path)
# Use ConfigManager to load config file
if config_path:
cfg = _apply_config_file(cfg, config_path)
from .config import ConfigManager
# Create temporary manager for loading
temp_hub = type('obj', (object,), {'config': cfg, 'log': None, '_state_lock': None})()
temp_mgr = ConfigManager(temp_hub) # type: ignore
data = temp_mgr.load_toml(config_path)
cfg = temp_mgr.apply_config_data(cfg, data)
if args.dest_name is not None:
cfg = replace(cfg, dest_name=args.dest_name)
+1204
View File
File diff suppressed because it is too large Load Diff
+126 -2
View File
@@ -1,6 +1,11 @@
from __future__ import annotations
from dataclasses import dataclass
import threading
from dataclasses import asdict, dataclass, replace
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from .service import HubService
@dataclass(frozen=True)
@@ -26,7 +31,7 @@ class HubRuntimeConfig:
rate_limit_msgs_per_minute: int = 240
ping_interval_s: float = 0.0
ping_timeout_s: float = 0.0
max_resource_bytes: int = 256 * 1024 # 256 KiB default
max_resource_bytes: int = 256 * 1024
max_pending_resource_expectations: int = 8
resource_expectation_ttl_s: float = 30.0
enable_resource_transfer: bool = True
@@ -36,3 +41,122 @@ class HubRuntimeConfig:
log_file: str | None = None
log_format: str = "%(asctime)s %(levelname)s %(name)s[%(threadName)s]: %(message)s"
log_datefmt: str | None = None
class ConfigManager:
"""
Manages hub configuration loading, reloading, and persistence.
Handles:
- Loading TOML configuration files
- Applying configuration updates
- Reloading configuration at runtime
- Config diffing and comparison
- Config file path resolution
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = hub.log
self._write_lock = threading.Lock()
def load_toml(self, path: str) -> dict:
"""Load a TOML file and return its contents as a dictionary."""
import tomllib
with open(path, "rb") as f:
data = tomllib.load(f)
return data if isinstance(data, dict) else {}
def apply_config_data(self, base: HubRuntimeConfig, data: dict) -> HubRuntimeConfig:
"""Apply configuration data from TOML to a runtime config instance."""
hub = data.get("hub") if isinstance(data, dict) else None
if isinstance(hub, dict):
data = {**data, **hub}
log_table = data.get("logging") if isinstance(data, dict) else None
if isinstance(log_table, dict):
mapped: dict[str, object] = {}
if "level" in log_table:
mapped["log_level"] = log_table.get("level")
if "rns_level" in log_table:
mapped["log_rns_level"] = log_table.get("rns_level")
if "console" in log_table:
mapped["log_console"] = log_table.get("console")
if "file" in log_table:
mapped["log_file"] = log_table.get("file")
if "format" in log_table:
mapped["log_format"] = log_table.get("format")
if "datefmt" in log_table:
mapped["log_datefmt"] = log_table.get("datefmt")
data = {**data, **mapped}
allowed = set(asdict(base).keys())
allowed.discard("config_path")
updates = {k: v for k, v in data.items() if k in allowed}
for list_key in ("trusted_identities", "banned_identities"):
if list_key in updates and isinstance(updates[list_key], list):
updates[list_key] = tuple(str(x) for x in updates[list_key])
if "announce" in data and "announce_on_start" not in updates:
try:
updates["announce_on_start"] = bool(data["announce"])
except Exception:
pass
if "configdir" in updates and updates["configdir"] == "":
updates["configdir"] = None
if "greeting" in updates and updates["greeting"] == "":
updates["greeting"] = None
if "log_file" in updates and updates["log_file"] == "":
updates["log_file"] = None
if "log_datefmt" in updates and updates["log_datefmt"] == "":
updates["log_datefmt"] = None
return replace(base, **updates) if updates else base
def format_reload_value(self, v: Any) -> str:
"""Format a config value for display in reload summaries."""
if v is None:
return "(none)"
if isinstance(v, (bool, int, float)):
return str(v)
if isinstance(v, (tuple, list, set)):
return f"len={len(v)}"
s = str(v)
s = " ".join(s.split())
if len(s) > 80:
s = s[:77] + "..."
return s
def diff_config_summary(
self, old: HubRuntimeConfig, new: HubRuntimeConfig
) -> list[str]:
"""Generate a summary of differences between two config instances."""
old_d = asdict(old)
new_d = asdict(new)
old_d.pop("config_path", None)
new_d.pop("config_path", None)
changed: list[str] = []
for k in sorted(new_d.keys()):
if old_d.get(k) == new_d.get(k):
continue
changed.append(
f"{k}: {self.format_reload_value(old_d.get(k))} -> {self.format_reload_value(new_d.get(k))}"
)
return changed
def get_config_path_for_writes(self) -> str | None:
"""Get the resolved config file path for write operations."""
from .util import expand_path
p = self.hub.config.config_path
if not p:
return None
return expand_path(str(p))
def get_write_lock(self) -> threading.Lock:
"""Get the lock used for config file write operations."""
return self._write_lock
-3
View File
@@ -85,11 +85,8 @@ def validate_envelope(env: dict) -> None:
room = env[K_ROOM]
if not isinstance(room, str):
raise TypeError("room name must be a string")
# Per RRC spec, room field may be empty (e.g., for hub commands)
if K_NICK in env:
nick = env[K_NICK]
if not isinstance(nick, str):
raise TypeError("nickname must be a string")
# Per spec, nicknames are advisory and may be empty or "ridiculous".
# Type-check only; implementations may sanitize/ignore for display.
-1
View File
@@ -99,7 +99,6 @@ def configure_logging(
root.setLevel(level)
# Library loggers
logging.getLogger("RNS").setLevel(rns_level)
logging.captureWarnings(True)
+294
View File
@@ -0,0 +1,294 @@
"""Message sending and queueing utilities for the RRC hub."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import RNS
from .codec import encode
from .constants import B_WELCOME_HUB, B_WELCOME_VER, T_ERROR, T_NOTICE, T_WELCOME
from .envelope import make_envelope
if TYPE_CHECKING:
from .service import HubService
# Maximum characters per NOTICE chunk for MTU-safe delivery
MAX_NOTICE_CHUNK_CHARS = 512
class MessageHelper:
"""
Helper methods for sending and queueing messages.
Handles:
- Message queueing (outgoing lists)
- Notice chunking for large messages
- WELCOME message construction
- Error and notice emission
- Smart text sending (resource vs chunks)
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = hub.log
def packet_would_fit(self, link: RNS.Link, payload: bytes) -> bool:
"""Check if payload fits within link MDU without creating/packing packets."""
try:
if hasattr(link, "MDU") and link.MDU is not None:
return len(payload) <= link.MDU
pkt = RNS.Packet(link, payload)
pkt.pack()
return True
except Exception:
return False
def queue_payload(
self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, payload: bytes
) -> None:
"""Add a raw payload to the outgoing queue."""
self.hub.stats_manager.inc("bytes_out", len(payload))
outgoing.append((link, payload))
def queue_env(
self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, env: dict
) -> None:
"""Encode and queue an envelope."""
payload = encode(env)
self.queue_payload(outgoing, link, payload)
def queue_notice_chunks(
self,
outgoing: list[tuple[RNS.Link, bytes]],
link: RNS.Link,
*,
room: str | None,
text: str,
) -> None:
"""Split and queue a notice message into MTU-sized chunks."""
if self.hub.identity is None:
return
if not text:
return
lines = text.splitlines() or [text]
for line in lines:
remaining = line
if not remaining:
continue
max_chars = min(len(remaining), MAX_NOTICE_CHUNK_CHARS)
while remaining:
take = min(len(remaining), max_chars)
chunk = remaining[:take]
env = make_envelope(
T_NOTICE,
src=self.hub.identity.hash,
room=room,
body=chunk,
)
payload = encode(env)
if self.packet_would_fit(link, payload):
self.queue_payload(outgoing, link, payload)
remaining = remaining[take:]
max_chars = min(max_chars, MAX_NOTICE_CHUNK_CHARS)
continue
if max_chars <= 1:
self.log.warning(
"NOTICE chunk would not fit MTU; dropping remainder (%s chars)",
len(remaining),
)
break
max_chars = max(1, max_chars // 2)
def queue_welcome(
self,
outgoing: list[tuple[RNS.Link, bytes]],
link: RNS.Link,
*,
peer_hash: Any,
motd: str | None,
) -> None:
"""Queue a WELCOME message for a newly connected peer."""
if self.hub.identity is None:
return
from . import __version__
body_w: dict[int, Any] = {
B_WELCOME_HUB: self.hub.config.hub_name,
B_WELCOME_VER: str(__version__),
}
welcome = make_envelope(T_WELCOME, src=self.hub.identity.hash, body=body_w)
welcome_payload = encode(welcome)
if not self.packet_would_fit(link, welcome_payload):
self.log.warning(
"WELCOME would not fit MTU; cannot welcome peer=%s link_id=%s",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
)
return
self.queue_payload(outgoing, link, welcome_payload)
self.log.debug(
"Queued WELCOME peer=%s link_id=%s",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
)
def send_text_smart(
self,
link: RNS.Link,
*,
msg_type: int,
text: str,
room: str | None = None,
kind: str | None = None,
outgoing: list[tuple[RNS.Link, bytes]] | None = None,
encoding: str = "utf-8",
) -> None:
"""
Send text message using the most efficient method:
- Resource transfer for large messages (if enabled and outgoing is None)
- Chunked messages otherwise
"""
from .constants import RES_KIND_MOTD, RES_KIND_NOTICE
resource_kind = kind
if resource_kind is None:
resource_kind = (
RES_KIND_MOTD
if msg_type == T_NOTICE and room is None
else RES_KIND_NOTICE
)
if (
self.hub.config.enable_resource_transfer
and outgoing is None
and len(text.encode(encoding, errors="replace")) > 512
):
self.log.debug(
"Attempting resource transfer link_id=%s kind=%s chars=%s",
self.hub._fmt_link_id(link),
resource_kind,
len(text),
)
if self.hub.resource_manager.send_via_resource(
link,
kind=resource_kind,
payload=text.encode(encoding, errors="replace"),
room=room,
encoding=encoding,
):
self.log.debug(
"Sent large text via resource link_id=%s kind=%s chars=%s",
self.hub._fmt_link_id(link),
resource_kind,
len(text),
)
return
else:
self.log.warning(
"Resource send failed, falling back to chunks link_id=%s",
self.hub._fmt_link_id(link),
)
if msg_type == T_NOTICE:
self.log.debug(
"Falling back to chunking link_id=%s outgoing_is_none=%s",
self.hub._fmt_link_id(link),
outgoing is None,
)
if outgoing is None:
outgoing = []
self.queue_notice_chunks(outgoing, link, room=room, text=text)
for out_link, chunk_payload in outgoing:
self.hub.stats_manager.inc("bytes_out", len(chunk_payload))
try:
RNS.Packet(out_link, chunk_payload).send()
except Exception as e:
self.log.warning(
"Failed to send chunk link_id=%s: %s",
self.hub._fmt_link_id(out_link),
e,
)
else:
self.queue_notice_chunks(outgoing, link, room=room, text=text)
else:
self.log.error(
"Message too large and not NOTICE link_id=%s type=%s",
self.hub._fmt_link_id(link),
msg_type,
)
def emit_notice(
self,
outgoing: list[tuple[RNS.Link, bytes]] | None,
link: RNS.Link,
room: str | None,
text: str,
) -> None:
"""Emit a notice message (queued or immediate)."""
if self.hub.identity is None:
return
env = make_envelope(T_NOTICE, src=self.hub.identity.hash, room=room, body=text)
if outgoing is None:
self.send(link, env)
else:
self.queue_env(outgoing, link, env)
def emit_error(
self,
outgoing: list[tuple[RNS.Link, bytes]] | None,
link: RNS.Link,
*,
src: bytes,
text: str,
room: str | None = None,
) -> None:
"""Emit an error message (queued or immediate)."""
self.hub.stats_manager.inc("errors_sent")
env = make_envelope(T_ERROR, src=src, room=room, body=text)
if outgoing is None:
self.send(link, env)
else:
self.queue_env(outgoing, link, env)
def notice_to(self, link: RNS.Link, room: str | None, text: str) -> None:
"""Send a notice message immediately."""
if self.hub.identity is None:
return
env = make_envelope(T_NOTICE, src=self.hub.identity.hash, room=room, body=text)
self.send(link, env)
def error(
self, link: RNS.Link, src: bytes, text: str, room: str | None = None
) -> None:
"""Send an error message immediately."""
self.emit_error(None, link, src=src, text=text, room=room)
def send(self, link: RNS.Link, env: dict) -> None:
"""Send an envelope immediately (not queued)."""
payload = encode(env)
self.hub.stats_manager.inc("bytes_out", len(payload))
try:
RNS.Packet(link, payload).send()
except OSError as e:
self.log.warning(
"Send failed link_id=%s bytes=%s err=%s",
self.hub._fmt_link_id(link),
len(payload),
e,
)
except Exception:
self.log.debug(
"Send failed link_id=%s bytes=%s",
self.hub._fmt_link_id(link),
len(payload),
exc_info=True,
)
+563
View File
@@ -0,0 +1,563 @@
"""Resource transfer management for RRCD."""
from __future__ import annotations
import hashlib
import os
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING
import RNS
from rrcd.codec import encode
from rrcd.constants import (
B_RES_ENCODING,
B_RES_ID,
B_RES_KIND,
B_RES_SHA256,
B_RES_SIZE,
RES_KIND_BLOB,
RES_KIND_MOTD,
RES_KIND_NOTICE,
T_NOTICE,
T_RESOURCE_ENVELOPE,
)
from rrcd.envelope import make_envelope
if TYPE_CHECKING:
from rrcd.service import HubService
@dataclass
class _ResourceExpectation:
"""Tracks an expected incoming Resource transfer."""
id: bytes
kind: str
size: int
sha256: bytes | None
encoding: str | None
created_at: float
expires_at: float
room: str | None = None
class ResourceManager:
"""Manages RNS Resource transfers for the hub."""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = hub.log
self._resource_expectations: dict[
RNS.Link, dict[bytes, _ResourceExpectation]
] = {}
self._active_resources: dict[RNS.Link, set[RNS.Resource]] = {}
self._resource_bindings: dict[RNS.Resource, bytes] = {}
def on_link_established(self, link: RNS.Link) -> None:
"""Initialize resource tracking for a new link."""
self._resource_expectations[link] = {}
self._active_resources[link] = set()
def on_link_closed(self, link: RNS.Link) -> None:
"""Clean up resource state when a link closes."""
self._resource_expectations.pop(link, None)
self._active_resources.pop(link, None)
def clear_all(self) -> None:
"""Clear all resource state (called during shutdown)."""
self._resource_expectations.clear()
self._active_resources.clear()
def configure_link_callbacks(self, link: RNS.Link) -> None:
"""Set up resource callbacks for a link if resource transfer is enabled."""
if not self.hub.config.enable_resource_transfer:
return
try:
link.set_resource_strategy(RNS.Link.ACCEPT_APP)
link.set_resource_callback(self._resource_advertised)
link.set_resource_concluded_callback(self._resource_concluded)
self.log.debug(
"Resource callbacks configured link_id=%s",
self.hub._fmt_link_id(link),
)
except Exception as e:
self.log.warning(
"Failed to set resource callbacks link_id=%s: %s",
self.hub._fmt_link_id(link),
e,
)
def cleanup_expired_expectations(self, link: RNS.Link) -> None:
"""Remove expired resource expectations for a link."""
now = time.time()
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return
expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now]
for rid in expired:
exp_dict.pop(rid, None)
self.log.debug(
"Expired resource expectation link_id=%s rid=%s",
self.hub._fmt_link_id(link),
rid.hex() if isinstance(rid, bytes) else rid,
)
def cleanup_all_expired_expectations(self) -> None:
"""Cleanup expired resource expectations across all links."""
now = time.time()
with self.hub._state_lock:
for link, exp_dict in list(self._resource_expectations.items()):
if not exp_dict:
continue
expired = [
rid for rid, exp in exp_dict.items() if exp.expires_at <= now
]
for rid in expired:
exp_dict.pop(rid, None)
self.log.debug(
"Expired resource expectation link_id=%s rid=%s",
self.hub._fmt_link_id(link),
rid.hex() if isinstance(rid, bytes) else rid,
)
def add_resource_expectation(
self,
link: RNS.Link,
*,
rid: bytes,
kind: str,
size: int,
sha256: bytes | None = None,
encoding: str | None = None,
room: str | None = None,
) -> bool:
"""Add a resource expectation. Returns False if limit exceeded."""
self.cleanup_expired_expectations(link)
exp_dict = self._resource_expectations.setdefault(link, {})
if len(exp_dict) >= self.hub.config.max_pending_resource_expectations:
self.log.warning(
"Max pending expectations exceeded link_id=%s",
self.hub._fmt_link_id(link),
)
return False
now = time.time()
exp = _ResourceExpectation(
id=rid,
kind=kind,
size=size,
sha256=sha256,
encoding=encoding,
created_at=now,
expires_at=now + self.hub.config.resource_expectation_ttl_s,
room=room,
)
exp_dict[rid] = exp
self.log.debug(
"Added resource expectation link_id=%s rid=%s kind=%s size=%s",
self.hub._fmt_link_id(link),
rid.hex(),
kind,
size,
)
return True
def find_resource_expectation(
self, link: RNS.Link, size: int
) -> _ResourceExpectation | None:
"""Find a matching resource expectation by size (fallback matching)."""
self.cleanup_expired_expectations(link)
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return None
for exp in exp_dict.values():
if exp.size == size:
return exp
return None
def get_resource_expectation_by_rid(
self, link: RNS.Link, rid: bytes
) -> _ResourceExpectation | None:
"""Lookup an expectation by RID without removing it."""
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return None
return exp_dict.get(rid)
def match_resource_expectation(
self, link: RNS.Link, *, rid: bytes | None, size: int, sha256: bytes | None
) -> _ResourceExpectation | None:
"""Find the expectation that should satisfy a completed resource.
Preference order:
1) Bound RID (from advertisement) when available.
2) Exact RID lookup.
3) Fallback: first size match whose sha256 (if present) matches.
"""
self.cleanup_expired_expectations(link)
if rid is not None:
exp = self.get_resource_expectation_by_rid(link, rid)
if exp is not None:
return exp
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return None
for exp in exp_dict.values():
if exp.size != size:
continue
if exp.sha256 and sha256 and exp.sha256 != sha256:
continue
return exp
return None
def pop_resource_expectation(
self, link: RNS.Link, rid: bytes
) -> _ResourceExpectation | None:
"""Remove and return a resource expectation."""
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return None
return exp_dict.pop(rid, None)
def _resource_advertised(self, resource: RNS.Resource) -> bool:
"""
Callback when a Resource is advertised by remote peer.
Returns True to accept, False to reject.
Minimize lock scope to prevent potential deadlocks with RNS internal locks.
"""
link = resource.link
if not self.hub.config.enable_resource_transfer:
self.log.debug(
"Rejecting resource (disabled) link_id=%s",
self.hub._fmt_link_id(link),
)
self.hub.stats_manager.inc("resources_rejected")
return False
size = resource.total_size if hasattr(resource, "total_size") else resource.size
if size > self.hub.config.max_resource_bytes:
self.log.warning(
"Rejecting resource (too large: %s > %s) link_id=%s",
size,
self.hub.config.max_resource_bytes,
self.hub._fmt_link_id(link),
)
self.hub.stats_manager.inc("resources_rejected")
return False
with self.hub._state_lock:
sess = self.hub.session_manager.sessions.get(link)
if not sess:
self.log.debug(
"Rejecting resource (no session) link_id=%s",
self.hub._fmt_link_id(link),
)
self.hub.stats_manager.inc("resources_rejected")
return False
exp = self.find_resource_expectation(link, size)
if not exp:
self.log.warning(
"Rejecting resource (no matching expectation) link_id=%s size=%s",
self.hub._fmt_link_id(link),
size,
)
self.hub.stats_manager.inc("resources_rejected")
return False
self.log.info(
"Accepting resource link_id=%s size=%s kind=%s",
self.hub._fmt_link_id(link),
size,
exp.kind,
)
with self.hub._state_lock:
self._active_resources.setdefault(link, set()).add(resource)
self._resource_bindings[resource] = exp.id
return True
def _resource_concluded(self, resource: RNS.Resource) -> None:
"""Callback when a Resource transfer completes."""
link = resource.link
with self.hub._state_lock:
active_set = self._active_resources.get(link)
if active_set:
active_set.discard(resource)
bound_rid = self._resource_bindings.pop(resource, None)
if resource.status != RNS.Resource.COMPLETE:
self.log.warning(
"Resource transfer failed link_id=%s status=%s",
self.hub._fmt_link_id(link),
resource.status,
)
return
try:
payload = (
resource.data.read()
if hasattr(resource.data, "read")
else resource.data
)
if isinstance(payload, bytearray):
payload = bytes(payload)
except Exception as e:
self.log.error(
"Failed to read resource data link_id=%s: %s",
self.hub._fmt_link_id(link),
e,
)
return
size = len(payload)
actual_hash = hashlib.sha256(payload).digest()
exp = self.match_resource_expectation(
link, rid=bound_rid, size=size, sha256=actual_hash
)
if not exp:
self.log.warning(
"Received resource without expectation link_id=%s size=%s",
self.hub._fmt_link_id(link),
size,
)
return
if exp.sha256 and actual_hash != exp.sha256:
self.log.error(
"Resource SHA256 mismatch link_id=%s expected=%s actual=%s",
self.hub._fmt_link_id(link),
exp.sha256.hex(),
actual_hash.hex(),
)
return
self.pop_resource_expectation(link, exp.id)
self.hub.stats_manager.inc("resources_received")
self.hub.stats_manager.inc("resource_bytes_received", size)
self.log.info(
"Resource received link_id=%s size=%s kind=%s",
self.hub._fmt_link_id(link),
size,
exp.kind,
)
try:
self._dispatch_received_resource(link, exp, payload)
except Exception as e:
self.log.exception(
"Failed to dispatch resource link_id=%s kind=%s: %s",
self.hub._fmt_link_id(link),
exp.kind,
e,
)
def _dispatch_received_resource(
self, link: RNS.Link, exp: _ResourceExpectation, payload: bytes
) -> None:
"""Dispatch a received resource payload to appropriate handler."""
if exp.kind == RES_KIND_NOTICE:
encoding = exp.encoding or "utf-8"
try:
text = payload.decode(encoding)
except Exception as e:
self.log.error(
"Failed to decode notice resource link_id=%s encoding=%s: %s",
self.hub._fmt_link_id(link),
encoding,
e,
)
return
self.log.info(
"Received large NOTICE via resource link_id=%s room=%r chars=%s",
self.hub._fmt_link_id(link),
exp.room,
len(text),
)
if exp.room and self.hub.identity is not None:
with self.hub._state_lock:
sess = self.hub.session_manager.sessions.get(link)
peer_hash = sess.get("peer") if sess else None
room_members = self.hub.room_manager.get_room_members(exp.room)
if peer_hash and room_members:
notice_env = make_envelope(
T_NOTICE,
src=peer_hash,
room=exp.room,
body=text,
)
notice_payload = encode(notice_env)
forwarded = 0
for other in room_members:
if other != link:
try:
other.packet(notice_payload)
forwarded += 1
except Exception as e:
self.log.warning(
"Failed to forward NOTICE resource link_id=%s: %s",
self.hub._fmt_link_id(other),
e,
)
if forwarded > 0:
self.hub.stats_manager.inc("notices_forwarded")
self.log.debug(
"Forwarded NOTICE resource to %d members room=%s",
forwarded,
exp.room,
)
elif exp.kind == RES_KIND_MOTD:
encoding = exp.encoding or "utf-8"
try:
text = payload.decode(encoding)
except Exception as e:
self.log.error(
"Failed to decode MOTD resource link_id=%s: %s",
self.hub._fmt_link_id(link),
e,
)
return
self.log.info(
"Received MOTD via resource link_id=%s chars=%s",
self.hub._fmt_link_id(link),
len(text),
)
elif exp.kind == RES_KIND_BLOB:
self.log.info(
"Received BLOB via resource link_id=%s bytes=%s",
self.hub._fmt_link_id(link),
len(payload),
)
else:
self.log.warning(
"Unknown resource kind link_id=%s kind=%s",
self.hub._fmt_link_id(link),
exp.kind,
)
def send_via_resource(
self,
link: RNS.Link,
*,
kind: str,
payload: bytes,
room: str | None = None,
encoding: str | None = None,
) -> bool:
"""
Send large payload via Resource.
Returns True if successfully initiated, False otherwise.
Note: This sends the resource envelope immediately, then creates
and advertises the resource. Should only be called when immediate
sending is desired (not when batching messages).
"""
if not self.hub.config.enable_resource_transfer:
return False
size = len(payload)
if size > self.hub.config.max_resource_bytes:
self.log.error(
"Payload too large for resource transfer: %s > %s",
size,
self.hub.config.max_resource_bytes,
)
return False
rid = os.urandom(8)
sha256 = hashlib.sha256(payload).digest()
if self.hub.identity is None:
return False
envelope_body = {
B_RES_ID: rid,
B_RES_KIND: kind,
B_RES_SIZE: size,
B_RES_SHA256: sha256,
}
if encoding:
envelope_body[B_RES_ENCODING] = encoding
envelope = make_envelope(
T_RESOURCE_ENVELOPE,
src=self.hub.identity.hash,
room=room,
body=envelope_body,
)
try:
envelope_payload = encode(envelope)
RNS.Packet(link, envelope_payload).send()
self.hub.stats_manager.inc("bytes_out", len(envelope_payload))
self.log.debug(
"Sent resource envelope link_id=%s rid=%s kind=%s size=%s",
self.hub._fmt_link_id(link),
rid.hex(),
kind,
size,
)
except Exception as e:
self.log.error(
"Failed to send resource envelope link_id=%s: %s",
self.hub._fmt_link_id(link),
e,
)
return False
try:
resource = RNS.Resource(payload, link, advertise=True, auto_compress=False)
with self.hub._state_lock:
self._active_resources.setdefault(link, set()).add(resource)
self.hub.stats_manager.inc("resources_sent")
self.hub.stats_manager.inc("resource_bytes_sent", size)
self.log.info(
"Sent resource link_id=%s rid=%s kind=%s size=%s",
self.hub._fmt_link_id(link),
rid.hex(),
kind,
size,
)
return True
except Exception as e:
self.log.error(
"Failed to create resource link_id=%s: %s",
self.hub._fmt_link_id(link),
e,
)
return False
+694
View File
@@ -0,0 +1,694 @@
"""Room management for RRCD hub.
This module handles all room-related functionality including:
- Room membership tracking
- Room state (modes, topic, permissions)
- Room registry persistence to TOML
- Permission management (ops, voiced, bans)
- Invite tracking with expiration
"""
from __future__ import annotations
import logging
import os
import threading
import time
from typing import TYPE_CHECKING, Any
import RNS
if TYPE_CHECKING:
from .service import HubService
class RoomManager:
"""Manages room memberships, state, permissions, and registry persistence."""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = logging.getLogger("rrcd.rooms")
self.rooms: dict[str, set[RNS.Link]] = {}
self._room_state: dict[str, dict[str, Any]] = {}
self._room_registry: dict[str, dict[str, Any]] = {}
self._room_registry_write_lock = threading.Lock()
def clear_all(self) -> None:
"""Clear all room state. Called during hub shutdown."""
self.rooms.clear()
self._room_state.clear()
self._room_registry.clear()
def get_room_members(self, room: str) -> set[RNS.Link]:
"""Get set of links currently in a room."""
return self.rooms.get(room, set())
def add_member(
self, room: str, link: RNS.Link, *, founder: bytes | None = None
) -> None:
"""Add a link to a room, creating the room if needed."""
if room not in self.rooms:
self.rooms[room] = set()
self._room_state_ensure(room, founder=founder)
self.rooms.setdefault(room, set()).add(link)
def remove_member(self, room: str, link: RNS.Link) -> None:
"""Remove a link from a room, cleaning up empty rooms."""
if room in self.rooms:
self.rooms[room].discard(link)
if not self.rooms[room]:
self.rooms.pop(room, None)
st = self._room_state_get(room)
if st is not None and not st.get("registered"):
self._room_state.pop(room, None)
def remove_member_from_all(self, link: RNS.Link) -> int:
"""Remove a link from all rooms. Returns number of rooms left."""
rooms_to_remove = [r for r, links in self.rooms.items() if link in links]
for room in rooms_to_remove:
self.remove_member(room, link)
return len(rooms_to_remove)
def get_member_rooms(self, link: RNS.Link) -> list[str]:
"""Get list of rooms a link is currently in."""
return [room for room, links in self.rooms.items() if link in links]
def get_stats(self) -> dict[str, Any]:
"""Get room statistics for hub stats."""
rooms_total = len(self.rooms)
memberships = sum(len(v) for v in self.rooms.values())
top_rooms = sorted(
((room, len(links)) for room, links in self.rooms.items()),
key=lambda x: (-x[1], x[0]),
)[:5]
return {
"rooms_total": rooms_total,
"memberships": memberships,
"top_rooms": top_rooms,
}
def _room_state_get(self, room: str) -> dict[str, Any] | None:
"""Get room state dict if it exists."""
return self._room_state.get(room)
def _room_state_ensure(
self, room: str, *, founder: bytes | None = None
) -> dict[str, Any]:
"""Ensure room state exists, creating from registry or defaults."""
st = self._room_state.get(room)
if st is not None:
if st.get("founder") is None and founder is not None:
st["founder"] = founder
st.setdefault("ops", set()).add(founder)
return st
if room in self._room_registry:
base = self._room_registry[room]
invited = base.get("invited")
invited_dict: dict[bytes, float] = {}
if isinstance(invited, dict):
for k, v in invited.items():
if isinstance(k, (bytes, bytearray)):
try:
invited_dict[bytes(k)] = float(v)
except Exception:
continue
st = {
"founder": base.get("founder"),
"registered": True,
"topic": base.get("topic"),
"moderated": bool(base.get("moderated", False)),
"invite_only": bool(base.get("invite_only", False)),
"topic_ops_only": bool(base.get("topic_ops_only", False)),
"no_outside_msgs": bool(base.get("no_outside_msgs", False)),
"private": bool(base.get("private", False)),
"key": base.get("key"),
"ops": set(base.get("ops", set())),
"voiced": set(base.get("voiced", set())),
"bans": set(base.get("bans", set())),
"invited": invited_dict,
"last_used_ts": base.get("last_used_ts"),
}
self._room_state[room] = st
return st
st = {
"founder": founder,
"registered": False,
"topic": None,
"moderated": False,
"invite_only": False,
"topic_ops_only": False,
"no_outside_msgs": False,
"private": False,
"key": None,
"ops": set([founder]) if founder is not None else set(),
"voiced": set(),
"bans": set(),
"invited": {},
"last_used_ts": None,
}
self._room_state[room] = st
return st
def touch_room(self, room: str) -> None:
"""Update last_used_ts for a room."""
try:
st = self._room_state_ensure(room)
ts = float(time.time())
st["last_used_ts"] = ts
reg = self._room_registry.get(room)
if isinstance(reg, dict):
reg["last_used_ts"] = ts
except Exception:
pass
def get_room_modes(self, room: str) -> dict[str, Any]:
"""Get dict of room mode flags."""
st = self._room_state_ensure(room)
registered = bool(st.get("registered", False))
moderated = bool(st.get("moderated", False))
invite_only = bool(st.get("invite_only", False))
topic_ops_only = bool(st.get("topic_ops_only", False))
no_outside_msgs = bool(st.get("no_outside_msgs", False))
private = bool(st.get("private", False))
key = st.get("key")
has_key = isinstance(key, str) and bool(key)
return {
"registered": registered,
"moderated": moderated,
"invite_only": invite_only,
"topic_ops_only": topic_ops_only,
"no_outside_msgs": no_outside_msgs,
"private": private,
"has_key": has_key,
}
def get_room_mode_string(self, room: str) -> str:
"""Get IRC-style mode string for a room."""
m = self.get_room_modes(room)
flags: list[str] = []
if m.get("invite_only"):
flags.append("i")
if m.get("has_key"):
flags.append("k")
if m.get("moderated"):
flags.append("m")
if m.get("no_outside_msgs"):
flags.append("n")
if m.get("private"):
flags.append("p")
if m.get("registered"):
flags.append("r")
if m.get("topic_ops_only"):
flags.append("t")
return "+" + "".join(flags) if flags else "(none)"
def broadcast_room_mode(
self, room: str, outgoing: list[tuple[RNS.Link, bytes]] | None = None
) -> None:
"""Broadcast current room mode to all members."""
mode_txt = self.get_room_mode_string(room)
recipients = list(self.get_room_members(room))
for other in recipients:
self.hub.message_helper.emit_notice(
outgoing, other, room, f"mode for {room} is now: {mode_txt}"
)
def is_room_moderated(self, room: str) -> bool:
"""Check if room is moderated."""
st = self._room_state_ensure(room)
return bool(st.get("moderated", False))
def is_room_op(self, room: str, peer_hash: bytes | None) -> bool:
"""Check if peer is a room operator."""
if peer_hash is None:
return False
if self.hub.trust_manager.is_server_op(peer_hash):
return True
st = self._room_state_ensure(room)
founder = st.get("founder")
if isinstance(founder, (bytes, bytearray)) and bytes(founder) == peer_hash:
return True
ops = st.get("ops")
return isinstance(ops, set) and peer_hash in ops
def is_room_voiced(self, room: str, peer_hash: bytes | None) -> bool:
"""Check if peer has voice in room."""
if peer_hash is None:
return False
if self.is_room_op(room, peer_hash):
return True
st = self._room_state_ensure(room)
voiced = st.get("voiced")
return isinstance(voiced, set) and peer_hash in voiced
def is_room_banned(self, room: str, peer_hash: bytes | None) -> bool:
"""Check if peer is banned from room."""
if peer_hash is None:
return False
st = self._room_state_ensure(room)
bans = st.get("bans")
return isinstance(bans, set) and peer_hash in bans
def is_invited(self, room: str, peer_hash: bytes) -> bool:
"""Check if peer has a valid (non-expired) invite."""
st = self._room_state_ensure(room)
inv = st.get("invited")
if not isinstance(inv, dict) or not inv:
return False
now = float(time.time())
exp = inv.get(peer_hash)
try:
exp_f = float(exp) if exp is not None else 0.0
except Exception:
exp_f = 0.0
if exp_f <= now:
inv.pop(peer_hash, None)
return False
return True
def prune_expired_invites(self, room: str) -> bool:
"""Remove expired invites from a room. Returns True if any were removed."""
st = self._room_state_ensure(room)
inv = st.get("invited")
if not isinstance(inv, dict) or not inv:
return False
now = float(time.time())
removed_any = False
for h, exp in list(inv.items()):
try:
exp_f = float(exp)
except Exception:
exp_f = 0.0
if exp_f <= now:
inv.pop(h, None)
removed_any = True
return removed_any
def load_registry_from_path(
self, path: str, *, invite_timeout_s: float
) -> tuple[dict[str, dict[str, Any]], str | None]:
"""Load room registry from TOML file. Returns (registry, error_msg)."""
if not path or not os.path.exists(path):
return {}, None
try:
from tomlkit import parse # type: ignore
except ImportError:
return {}, "missing dependency tomlkit"
try:
with open(path, encoding="utf-8") as f:
doc = parse(f.read())
except Exception as e:
return {}, f"parse error: {e}"
rooms_section = doc.get("rooms")
if not isinstance(rooms_section, dict):
return {}, None
registry: dict[str, dict[str, Any]] = {}
now = float(time.time())
for room_name, room_data in rooms_section.items():
if not isinstance(room_data, dict):
continue
founder = room_data.get("founder")
if isinstance(founder, str):
try:
founder = bytes.fromhex(founder.strip().lower().removeprefix("0x"))
except Exception:
founder = None
topic = room_data.get("topic")
if not isinstance(topic, str):
topic = None
moderated = bool(room_data.get("moderated", False))
invite_only = bool(room_data.get("invite_only", False))
topic_ops_only = bool(room_data.get("topic_ops_only", False))
no_outside_msgs = bool(room_data.get("no_outside_msgs", False))
private = bool(room_data.get("private", False))
key = room_data.get("key")
if not isinstance(key, str):
key = None
operators = room_data.get("operators", [])
ops: set[bytes] = set()
if isinstance(operators, list):
for op in operators:
if isinstance(op, str):
try:
ops.add(
bytes.fromhex(op.strip().lower().removeprefix("0x"))
)
except Exception:
continue
voiced_list = room_data.get("voiced", [])
voiced: set[bytes] = set()
if isinstance(voiced_list, list):
for v in voiced_list:
if isinstance(v, str):
try:
voiced.add(
bytes.fromhex(v.strip().lower().removeprefix("0x"))
)
except Exception:
continue
bans_list = room_data.get("bans", [])
bans: set[bytes] = set()
if isinstance(bans_list, list):
for b in bans_list:
if isinstance(b, str):
try:
bans.add(
bytes.fromhex(b.strip().lower().removeprefix("0x"))
)
except Exception:
continue
invited_dict = room_data.get("invited", {})
invited: dict[bytes, float] = {}
if isinstance(invited_dict, dict):
for h, exp in invited_dict.items():
if isinstance(h, str):
try:
h_bytes = bytes.fromhex(
h.strip().lower().removeprefix("0x")
)
exp_f = float(exp)
if exp_f > now:
invited[h_bytes] = exp_f
except Exception:
continue
last_used_ts = room_data.get("last_used_ts")
try:
last_used_ts = float(last_used_ts) if last_used_ts is not None else None
except Exception:
last_used_ts = None
registry[room_name] = {
"founder": founder,
"topic": topic,
"moderated": moderated,
"invite_only": invite_only,
"topic_ops_only": topic_ops_only,
"no_outside_msgs": no_outside_msgs,
"private": private,
"key": key,
"ops": ops,
"voiced": voiced,
"bans": bans,
"invited": invited,
"last_used_ts": last_used_ts,
}
return registry, None
def diff_registry_summary(
self, old: dict[str, dict[str, Any]], new: dict[str, dict[str, Any]]
) -> list[str]:
"""Generate human-readable summary of registry changes."""
old_rooms = set(old.keys())
new_rooms = set(new.keys())
added = sorted(new_rooms - old_rooms)
removed = sorted(old_rooms - new_rooms)
lines: list[str] = []
if added:
preview = ", ".join(added[:10])
suffix = "" if len(added) <= 10 else f" (+{len(added) - 10} more)"
lines.append(f"rooms_added={len(added)}: {preview}{suffix}")
if removed:
preview = ", ".join(removed[:10])
suffix = "" if len(removed) <= 10 else f" (+{len(removed) - 10} more)"
lines.append(f"rooms_removed={len(removed)}: {preview}{suffix}")
if not lines:
lines.append(f"rooms_changed=0 (registered_rooms={len(new_rooms)})")
return lines
def get_registry_path_for_writes(self) -> str | None:
"""Get path to room registry file for write operations."""
from .util import expand_path
p = self.hub.config.room_registry_path
if not p:
return None
return expand_path(str(p))
def persist_room_state(self, link: RNS.Link, room: str | None) -> None:
"""Persist room state to registry TOML file."""
if room is None:
return
reg_path = self.get_registry_path_for_writes()
if not reg_path:
return
st = self._room_state_get(room)
if not st or not st.get("registered"):
return
try:
from tomlkit import dumps, parse, table # type: ignore
except Exception:
return
try:
with self._room_registry_write_lock:
file_stat = None
try:
file_stat = os.stat(reg_path)
except Exception:
file_stat = None
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
rooms = doc.get("rooms")
if rooms is None:
rooms = table()
doc["rooms"] = rooms
room_tbl = rooms.get(room)
if room_tbl is None:
room_tbl = table()
rooms[room] = room_tbl
founder = st.get("founder")
if isinstance(founder, (bytes, bytearray)):
room_tbl["founder"] = bytes(founder).hex()
topic = st.get("topic")
if isinstance(topic, str) and topic.strip():
room_tbl["topic"] = topic
else:
if "topic" in room_tbl:
del room_tbl["topic"]
room_tbl["moderated"] = bool(st.get("moderated", False))
room_tbl["invite_only"] = bool(st.get("invite_only", False))
room_tbl["topic_ops_only"] = bool(st.get("topic_ops_only", False))
room_tbl["no_outside_msgs"] = bool(st.get("no_outside_msgs", False))
key = st.get("key")
if isinstance(key, str) and key:
room_tbl["key"] = key
else:
if "key" in room_tbl:
del room_tbl["key"]
last_used_ts = st.get("last_used_ts")
if last_used_ts is None:
last_used_ts = float(time.time())
try:
room_tbl["last_used_ts"] = float(last_used_ts)
except Exception:
room_tbl["last_used_ts"] = float(time.time())
ops = st.get("ops")
if isinstance(ops, set):
room_tbl["operators"] = sorted(
bytes(x).hex() for x in ops if isinstance(x, (bytes, bytearray))
)
voiced = st.get("voiced")
if isinstance(voiced, set):
room_tbl["voiced"] = sorted(
bytes(x).hex()
for x in voiced
if isinstance(x, (bytes, bytearray))
)
bans = st.get("bans")
if isinstance(bans, set):
room_tbl["bans"] = sorted(
bytes(x).hex()
for x in bans
if isinstance(x, (bytes, bytearray))
)
invited = st.get("invited")
if isinstance(invited, dict):
inv_tbl = {}
now = float(time.time())
for h, exp in invited.items():
if not isinstance(h, (bytes, bytearray)):
continue
try:
exp_f = float(exp)
except Exception:
continue
if exp_f > now:
inv_tbl[bytes(h).hex()] = exp_f
room_tbl["invited"] = inv_tbl
new_text = dumps(doc)
with open(reg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if file_stat is not None:
try:
os.chmod(reg_path, file_stat.st_mode)
except Exception:
pass
except Exception as e:
self.hub.message_helper.notice_to(
link, room, f"room config persist failed: {e}"
)
def delete_room_from_registry(self, link: RNS.Link, room: str) -> None:
"""Remove a room from the registry TOML file."""
reg_path = self.get_registry_path_for_writes()
if not reg_path:
return
try:
from tomlkit import dumps, parse # type: ignore
except Exception:
return
try:
with self._room_registry_write_lock:
file_stat = None
try:
file_stat = os.stat(reg_path)
except Exception:
file_stat = None
with open(reg_path, encoding="utf-8") as f:
doc = parse(f.read())
rooms = doc.get("rooms")
if isinstance(rooms, dict) and room in rooms:
try:
del rooms[room]
except Exception:
rooms.pop(room, None)
new_text = dumps(doc)
with open(reg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if file_stat is not None:
try:
os.chmod(reg_path, file_stat.st_mode)
except Exception:
pass
except Exception as e:
self.hub.message_helper.notice_to(
link, room, f"room unregister persist failed: {e}"
)
def prune_unused_registered_rooms(
self, prune_after_s: float, started_wall_time: float
) -> list[str]:
"""
Prune registered rooms that haven't been used recently.
Returns list of pruned room names.
"""
now = float(time.time())
rooms_to_prune: list[str] = []
for room, reg in list(self._room_registry.items()):
if room in self.rooms and self.rooms.get(room):
continue
last_used = reg.get("last_used_ts")
try:
last_used = float(last_used) if last_used is not None else None
except Exception:
last_used = None
if last_used is None:
last_used = started_wall_time
if (now - float(last_used)) < prune_after_s:
continue
self._room_registry.pop(room, None)
self._room_state.pop(room, None)
rooms_to_prune.append(room)
return rooms_to_prune
def merge_registry_into_state(self, registry: dict[str, dict[str, Any]]) -> None:
"""
Merge registry into live room state.
Updates in-memory state for active rooms with registry data.
"""
for r, st in list(self._room_state.items()):
if not isinstance(st, dict):
continue
reg = registry.get(r)
if reg is None:
if st.get("registered"):
st["registered"] = False
continue
st["registered"] = True
founder = reg.get("founder")
if isinstance(founder, (bytes, bytearray)):
st["founder"] = bytes(founder)
topic = reg.get("topic")
if isinstance(topic, str):
st["topic"] = topic
st["moderated"] = bool(reg.get("moderated", False))
st["invite_only"] = bool(reg.get("invite_only", False))
st["topic_ops_only"] = bool(reg.get("topic_ops_only", False))
st["no_outside_msgs"] = bool(reg.get("no_outside_msgs", False))
st["private"] = bool(reg.get("private", False))
key = reg.get("key")
if isinstance(key, str):
st["key"] = key
ops = reg.get("ops")
if isinstance(ops, set):
st["ops"] = set(ops)
voiced = reg.get("voiced")
if isinstance(voiced, set):
st["voiced"] = set(voiced)
bans = reg.get("bans")
if isinstance(bans, set):
st["bans"] = set(bans)
invited = reg.get("invited")
if isinstance(invited, dict):
st["invited"] = dict(invited)
last_used_ts = reg.get("last_used_ts")
if last_used_ts is not None:
st["last_used_ts"] = last_used_ts
+788
View File
@@ -0,0 +1,788 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
import RNS
from .codec import decode, encode
from .constants import (
B_HELLO_CAPS,
B_HELLO_NICK_LEGACY,
B_RES_ENCODING,
B_RES_ID,
B_RES_KIND,
B_RES_SHA256,
B_RES_SIZE,
K_BODY,
K_NICK,
K_ROOM,
K_SRC,
K_T,
T_HELLO,
T_JOIN,
T_JOINED,
T_MSG,
T_NOTICE,
T_PART,
T_PARTED,
T_PING,
T_PONG,
T_RESOURCE_ENVELOPE,
)
from .envelope import make_envelope, validate_envelope
from .util import normalize_nick
class OutgoingList(list):
"""Custom list that allows attaching callback attributes."""
pass
if TYPE_CHECKING:
from .service import HubService
class MessageRouter:
"""
Handles message routing and dispatching for the RRC hub.
This class is responsible for:
- Decoding and validating incoming packets
- Dispatching messages by type (HELLO, JOIN, PART, MSG, NOTICE, PING, etc.)
- Forwarding messages to appropriate rooms/recipients
- Rate limiting
- Protocol validation
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = logging.getLogger("rrcd.router")
def route_packet(
self,
link: RNS.Link,
data: bytes,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""
Main entry point for routing an incoming packet.
This method should be called with the state lock held.
"""
sess = self.hub.session_manager.sessions.get(link)
if sess is None:
return
self.hub.stats_manager.inc("pkts_in")
self.hub.stats_manager.inc("bytes_in", len(data))
peer_hash = sess.get("peer")
if peer_hash is None:
ri = link.get_remote_identity()
if ri is None:
return
peer_hash = ri.hash
sess["peer"] = peer_hash
if not self.hub.session_manager.refill_and_take(link, 1.0):
self.hub.stats_manager.inc("rate_limited")
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Rate limited peer=%s link_id=%s",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
)
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text="rate limited"
)
return
try:
env = decode(data)
validate_envelope(env)
except Exception as e:
self.hub.stats_manager.inc("pkts_bad")
self.log.debug(
"Bad packet peer=%s link_id=%s bytes=%s err=%s",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
len(data),
e,
)
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text=f"bad message: {e}"
)
return
t = env.get(K_T)
room = env.get(K_ROOM)
body = env.get(K_BODY)
if self.log.isEnabledFor(logging.DEBUG):
body_len = None
if isinstance(body, (bytes, bytearray)):
body_len = len(body)
elif isinstance(body, str):
body_len = len(body)
self.log.debug(
"RX peer=%s link_id=%s t=%s room=%r bytes=%s body_type=%s body_len=%s",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
t,
room,
len(data),
type(body).__name__,
body_len,
)
if t == T_PONG:
self._handle_pong(link, sess)
elif t == T_RESOURCE_ENVELOPE:
self._handle_resource_envelope(link, sess, env, outgoing)
elif not sess["welcomed"]:
self._handle_pre_welcome(link, sess, peer_hash, env, outgoing)
elif t == T_HELLO:
self._handle_re_hello(link, sess, peer_hash, env, outgoing)
elif t == T_JOIN:
self._handle_join(link, sess, peer_hash, env, outgoing)
elif t == T_PART:
self._handle_part(link, sess, peer_hash, env, outgoing)
elif t in (T_MSG, T_NOTICE):
self._handle_message(link, sess, peer_hash, env, outgoing)
elif t == T_PING:
self._handle_ping(link, env, outgoing)
def _handle_pong(self, link: RNS.Link, sess: dict[str, Any]) -> None:
"""Handle PONG message."""
self.hub.stats_manager.inc("pongs_in")
sess["awaiting_pong"] = None
def _handle_resource_envelope(
self,
link: RNS.Link,
sess: dict[str, Any],
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle RESOURCE_ENVELOPE message."""
room = env.get(K_ROOM)
body = env.get(K_BODY)
if not self.hub.config.enable_resource_transfer:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="resource transfer disabled",
room=room,
)
return
if not isinstance(body, dict):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="invalid resource envelope body",
room=room,
)
return
rid = body.get(B_RES_ID)
kind = body.get(B_RES_KIND)
size = body.get(B_RES_SIZE)
sha256 = body.get(B_RES_SHA256)
encoding = body.get(B_RES_ENCODING)
if not isinstance(rid, (bytes, bytearray)):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="resource envelope missing id",
room=room,
)
return
if not isinstance(kind, str) or not kind:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="resource envelope missing kind",
room=room,
)
return
if not isinstance(size, int) or size < 0:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="resource envelope invalid size",
room=room,
)
return
if size > self.hub.config.max_resource_bytes:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text=f"resource too large: {size} > {self.hub.config.max_resource_bytes}",
room=room,
)
return
if sha256 is not None and not isinstance(sha256, (bytes, bytearray)):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="resource envelope invalid sha256",
room=room,
)
return
if encoding is not None and not isinstance(encoding, str):
encoding = None
if not self.hub.resource_manager.add_resource_expectation(
link,
rid=bytes(rid),
kind=kind,
size=size,
sha256=bytes(sha256) if sha256 else None,
encoding=encoding,
room=room,
):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="too many pending resource expectations",
room=room,
)
def _handle_pre_welcome(
self,
link: RNS.Link,
sess: dict[str, Any],
peer_hash: bytes,
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle messages before WELCOME (only HELLO is allowed)."""
t = env.get(K_T)
nick = env.get(K_NICK)
body = env.get(K_BODY)
if t != T_HELLO:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text="send HELLO first"
)
return
old_nick = sess.get("nick")
new_nick = None
if isinstance(nick, str):
n = normalize_nick(nick, max_chars=self.hub.config.nick_max_chars)
if n is not None:
new_nick = n
sess["nick"] = n
if isinstance(body, dict):
sess["peer_caps"] = self._extract_caps(body)
if new_nick is None:
legacy_nick = body.get(B_HELLO_NICK_LEGACY)
n2 = normalize_nick(
legacy_nick, max_chars=self.hub.config.nick_max_chars
)
if n2 is not None:
new_nick = n2
sess["nick"] = n2
self.log.info(
"HELLO peer=%s nick=%r link_id=%s",
self.hub._fmt_hash(peer_hash),
sess.get("nick"),
self.hub._fmt_link_id(link),
)
self.hub.session_manager.send_welcome(
link,
outgoing,
peer_hash=peer_hash,
old_nick=old_nick,
new_nick=new_nick,
)
def _handle_re_hello(
self,
link: RNS.Link,
sess: dict[str, Any],
peer_hash: bytes,
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle re-authentication (HELLO after already welcomed)."""
nick = env.get(K_NICK)
body = env.get(K_BODY)
if self.hub.identity is None:
return
old_nick = sess.get("nick")
old_rooms = set(sess.get("rooms", set()))
sess["welcomed"] = False
sess["rooms"] = set()
sess["nick"] = None
sess["peer_caps"] = {}
for r in old_rooms:
self.hub.room_manager.remove_member(r, link)
new_nick = None
if isinstance(nick, str):
n = normalize_nick(nick, max_chars=self.hub.config.nick_max_chars)
if n is not None:
new_nick = n
sess["nick"] = n
if isinstance(body, dict):
sess["peer_caps"] = self._extract_caps(body)
if new_nick is None:
legacy_nick = body.get(B_HELLO_NICK_LEGACY)
n2 = normalize_nick(
legacy_nick, max_chars=self.hub.config.nick_max_chars
)
if n2 is not None:
new_nick = n2
sess["nick"] = n2
self.log.info(
"Re-HELLO peer=%s nick=%r link_id=%s",
self.hub._fmt_hash(peer_hash),
sess.get("nick"),
self.hub._fmt_link_id(link),
)
self.hub.session_manager.send_welcome(
link,
outgoing,
peer_hash=peer_hash,
old_nick=old_nick,
new_nick=new_nick,
)
def _handle_join(
self,
link: RNS.Link,
sess: dict[str, Any],
peer_hash: bytes,
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle JOIN message."""
room = env.get(K_ROOM)
body = env.get(K_BODY)
self.hub.stats_manager.inc("joins")
if not isinstance(room, str) or not room:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="JOIN requires room name",
)
return
if len(sess["rooms"]) >= int(self.hub.config.max_rooms_per_session):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text="too many rooms"
)
return
try:
r = self.hub._norm_room(room)
except Exception as e:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text=str(e)
)
return
if r in self.hub.room_manager._room_registry:
self.hub.room_manager._room_state_ensure(r)
st = self.hub.room_manager._room_state_ensure(r)
if bool(st.get("invite_only", False)):
is_invited = self.hub.room_manager.is_invited(r, peer_hash)
if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="invite-only (+i)",
room=r,
)
return
key = st.get("key")
if isinstance(key, str) and key:
is_invited = self.hub.room_manager.is_invited(r, peer_hash)
if not self.hub.room_manager.is_room_op(r, peer_hash) and not is_invited:
provided = body if isinstance(body, str) else None
if provided != key:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="bad key (+k)",
room=r,
)
return
if self.hub.room_manager.is_room_banned(r, peer_hash):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="banned from room",
room=r,
)
return
if not self.hub.room_manager.get_room_members(r):
pass
self.hub.room_manager._room_state_ensure(r, founder=peer_hash)
sess["rooms"].add(r)
self.hub.room_manager.add_member(r, link)
self.log.info(
"JOIN peer=%s nick=%r room=%s link_id=%s",
self.hub._fmt_hash(peer_hash),
sess.get("nick"),
r,
self.hub._fmt_link_id(link),
)
self.hub.room_manager.touch_room(r)
joined_body = None
if self.hub.config.include_joined_member_list:
members: list[bytes] = []
for member_link in self.hub.room_manager.get_room_members(r):
s = self.hub.session_manager.sessions.get(member_link)
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
members.append(bytes(ph))
joined_body = members
joined = make_envelope(
T_JOINED, src=self.hub.identity.hash, room=r, body=joined_body
)
self.hub.message_helper.queue_env(outgoing, link, joined)
try:
inv = st.get("invited")
if isinstance(inv, dict) and peer_hash in inv:
inv.pop(peer_hash, None)
if bool(st.get("registered")):
self.hub.room_manager.persist_room_state(link, r)
except Exception:
pass
try:
registered = bool(st.get("registered", False))
topic = st.get("topic") if isinstance(st.get("topic"), str) else None
mode_txt = self.hub.room_manager.get_room_mode_string(r)
topic_txt = topic if topic else "(none)"
reg_txt = "registered" if registered else "unregistered"
self.hub.message_helper.emit_notice(
outgoing,
link,
r,
f"room {r}: {reg_txt}; mode={mode_txt}; topic={topic_txt}",
)
except Exception:
pass
def _handle_part(
self,
link: RNS.Link,
sess: dict[str, Any],
peer_hash: bytes,
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle PART message."""
room = env.get(K_ROOM)
self.hub.stats_manager.inc("parts")
if not isinstance(room, str) or not room:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="PART requires room name",
)
return
try:
r = self.hub._norm_room(room)
except Exception as e:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text=str(e)
)
return
sess["rooms"].discard(r)
if self.hub.room_manager.get_room_members(r):
self.hub.room_manager.remove_member(r, link)
if not self.hub.room_manager.get_room_members(r):
self.hub.room_manager.remove_member(r, link)
st = self.hub.room_manager._room_state_get(r)
if st is not None:
self.hub.room_manager.touch_room(r)
if st.get("registered"):
self.hub.room_manager.persist_room_state(link, r)
if st is not None and not st.get("registered"):
self.hub.room_manager._room_state.pop(r, None)
parted_body = None
if self.hub.config.include_joined_member_list:
members: list[bytes] = []
for member_link in self.hub.room_manager.get_room_members(r):
s = self.hub.session_manager.sessions.get(member_link)
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
members.append(bytes(ph))
parted_body = members
if self.hub.identity is not None:
parted = make_envelope(
T_PARTED, src=self.hub.identity.hash, room=r, body=parted_body
)
self.hub.message_helper.queue_env(outgoing, link, parted)
self.log.info(
"PART peer=%s nick=%r room=%s link_id=%s",
self.hub._fmt_hash(peer_hash),
sess.get("nick"),
r,
self.hub._fmt_link_id(link),
)
def _handle_message(
self,
link: RNS.Link,
sess: dict[str, Any],
peer_hash: bytes,
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle MSG and NOTICE messages."""
t = env.get(K_T)
room = env.get(K_ROOM)
body = env.get(K_BODY)
if isinstance(body, str):
cmdline = body.strip()
if cmdline.startswith("/"):
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Slash command peer=%s link_id=%s cmd=%r room=%r",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
cmdline,
room,
)
handled = self.hub.command_handler.handle_operator_command(
link, peer_hash=peer_hash, room=room, text=body, outgoing=outgoing
)
if handled:
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Slash command handled, queued=%d responses",
len(outgoing),
)
return
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="unrecognized command",
room=room,
)
return
if t == T_MSG:
if not isinstance(room, str) or not room:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="message requires room name",
)
return
elif t == T_NOTICE:
if not isinstance(room, str) or not room:
return
try:
r = self.hub._norm_room(room)
except Exception as e:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing, link, src=self.hub.identity.hash, text=str(e)
)
return
if r not in sess["rooms"]:
st = None
if r in self.hub.room_manager._room_registry:
st = self.hub.room_manager._room_state_ensure(r)
elif self.hub.room_manager.get_room_members(r):
st = self.hub.room_manager._room_state_ensure(r)
if st is None:
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="no such room",
room=r,
)
return
if bool(st.get("no_outside_msgs", False)):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="no outside messages (+n)",
room=r,
)
return
if self.hub.room_manager.is_room_banned(r, peer_hash):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="banned from room",
room=r,
)
return
if self.hub.room_manager.is_room_moderated(
r
) and not self.hub.room_manager.is_room_voiced(r, peer_hash):
if self.hub.identity is not None:
self.hub.message_helper.emit_error(
outgoing,
link,
src=self.hub.identity.hash,
text="room is moderated (+m)",
room=r,
)
return
if peer_hash is not None:
env[K_SRC] = (
bytes(peer_hash)
if isinstance(peer_hash, (bytes, bytearray))
else peer_hash
)
env[K_ROOM] = r
incoming_nick = env.get(K_NICK)
if incoming_nick is not None:
n = normalize_nick(incoming_nick, max_chars=self.hub.config.nick_max_chars)
if n is not None:
old_session_nick = sess.get("nick")
if old_session_nick != n:
sess["nick"] = n
self.hub.session_manager.update_nick_index(
link, old_session_nick, n
)
env[K_NICK] = n
else:
env.pop(K_NICK, None)
else:
nick = sess.get("nick")
n = normalize_nick(nick, max_chars=self.hub.config.nick_max_chars)
if n is not None:
env[K_NICK] = n
payload = encode(env)
for other in list(self.hub.room_manager.get_room_members(r)):
self.hub.message_helper.queue_payload(outgoing, other, payload)
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Forwarded t=%s peer=%s nick=%r room=%s recipients=%s body_type=%s",
t,
self.hub._fmt_hash(peer_hash),
sess.get("nick"),
r,
len(self.hub.room_manager.get_room_members(r)),
type(body).__name__,
)
if t == T_MSG:
self.hub.stats_manager.inc("msgs_forwarded")
else:
self.hub.stats_manager.inc("notices_forwarded")
def _handle_ping(
self,
link: RNS.Link,
env: dict,
outgoing: list[tuple[RNS.Link, bytes]],
) -> None:
"""Handle PING message."""
body = env.get(K_BODY)
self.hub.stats_manager.inc("pings_in")
if self.hub.identity is not None:
pong = make_envelope(T_PONG, src=self.hub.identity.hash, body=body)
self.hub.stats_manager.inc("pongs_out")
self.hub.message_helper.queue_env(outgoing, link, pong)
def _extract_caps(self, body: Any) -> dict[int, Any]:
"""Extract capabilities from HELLO body."""
if not isinstance(body, dict):
return {}
caps = body.get(B_HELLO_CAPS)
return caps if isinstance(caps, dict) else {}
+120 -3697
View File
File diff suppressed because it is too large Load Diff
+264
View File
@@ -0,0 +1,264 @@
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import RNS
if TYPE_CHECKING:
from .service import HubService
@dataclass
class _RateState:
"""Token bucket state for rate limiting."""
tokens: float
last_refill: float
class SessionManager:
"""
Manages session lifecycle for RRC hub connections.
This class is responsible for:
- Session creation and initialization
- Session state management (nicknames, rooms, capabilities)
- Nickname indexing for efficient lookups
- Rate limiting with token bucket algorithm
- Session cleanup and teardown
- Remote identity tracking
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = logging.getLogger("rrcd.session")
self.sessions: dict[RNS.Link, dict[str, Any]] = {}
self._rate: dict[RNS.Link, _RateState] = {}
self._index_by_hash: dict[bytes, RNS.Link] = {} # identity hash -> link
self._index_by_nick: dict[str, set[RNS.Link]] = {} # normalized nick -> links
def on_link_established(self, link: RNS.Link) -> None:
"""
Handle new link establishment.
Creates session state and sets up callbacks.
Must be called with state lock held.
"""
self.sessions[link] = {
"welcomed": False,
"rooms": set(),
"peer": None,
"nick": None,
"peer_caps": {},
"awaiting_pong": None,
}
self._rate[link] = _RateState(
tokens=float(self.hub.config.rate_limit_msgs_per_minute),
last_refill=time.monotonic(),
)
self.log.info("Session created link_id=%s", self.hub._fmt_link_id(link))
def on_remote_identified(
self, link: RNS.Link, identity: RNS.Identity | None
) -> tuple[bool, bytes | None]:
"""
Handle remote identity being established.
Returns:
(is_banned, peer_hash) tuple
Must be called with state lock held.
"""
sess = self.sessions.get(link)
if sess is None:
return False, None
if identity is not None:
peer_hash = identity.hash
sess["peer"] = peer_hash
self._index_by_hash[bytes(peer_hash)] = link
banned = self.hub.trust_manager.is_banned(bytes(peer_hash))
if not banned:
self.log.info(
"Remote identified peer=%s link_id=%s",
self.hub._fmt_hash(peer_hash),
self.hub._fmt_link_id(link),
)
return banned, peer_hash
return False, None
def on_link_closed(self, link: RNS.Link) -> tuple[bytes | None, str | None, int]:
"""
Handle link closure and cleanup.
Returns:
(peer_hash, nick, rooms_count) for logging
Must be called with state lock held.
"""
sess = self.sessions.pop(link, None)
self._rate.pop(link, None)
if not sess:
return None, None, 0
peer = sess.get("peer")
nick = sess.get("nick")
rooms_count = len(sess.get("rooms") or ())
if isinstance(peer, (bytes, bytearray)):
self._index_by_hash.pop(bytes(peer), None)
if nick:
self.update_nick_index(link, nick, None)
for room in list(sess["rooms"]):
self.hub.room_manager.remove_member(room, link)
return peer, nick, rooms_count
def update_nick_index(
self, link: RNS.Link, old_nick: str | None, new_nick: str | None
) -> None:
"""
Update nickname index when a nick changes.
Must be called with state lock held.
"""
if old_nick:
old_key = old_nick.strip().lower()
if old_key in self._index_by_nick:
self._index_by_nick[old_key].discard(link)
if not self._index_by_nick[old_key]:
self._index_by_nick.pop(old_key, None)
if new_nick:
new_key = new_nick.strip().lower()
self._index_by_nick.setdefault(new_key, set()).add(link)
def refill_and_take(self, link: RNS.Link, cost: float = 1.0) -> bool:
"""
Token bucket rate limiting.
Refills tokens based on elapsed time and attempts to take `cost` tokens.
Returns True if tokens were available and taken, False if rate limited.
Must be called with state lock held.
"""
state = self._rate.get(link)
if state is None:
return True
now = time.monotonic()
per_min = float(max(1, int(self.hub.config.rate_limit_msgs_per_minute)))
rate_per_s = per_min / 60.0
elapsed = max(0.0, now - state.last_refill)
state.tokens = min(per_min, state.tokens + elapsed * rate_per_s)
state.last_refill = now
if state.tokens < cost:
return False
state.tokens -= cost
return True
def get_session(self, link: RNS.Link) -> dict[str, Any] | None:
"""Get session state for a link."""
return self.sessions.get(link)
def get_link_by_hash(self, peer_hash: bytes) -> RNS.Link | None:
"""Look up link by peer identity hash (O(1))."""
return self._index_by_hash.get(bytes(peer_hash))
def get_links_by_nick(self, nick: str) -> set[RNS.Link]:
"""Look up links by normalized nickname (O(1))."""
key = nick.strip().lower()
return self._index_by_nick.get(key, set()).copy()
def clear_all(self) -> list[RNS.Link]:
"""
Clear all sessions and return list of links for teardown.
Must be called with state lock held.
"""
links = list(self.sessions.keys())
self.sessions.clear()
self._rate.clear()
self._index_by_hash.clear()
self._index_by_nick.clear()
return links
def get_stats(self) -> dict[str, Any]:
"""Get session statistics for monitoring."""
total = len(self.sessions)
welcomed = sum(1 for s in self.sessions.values() if s.get("welcomed"))
identified = sum(1 for s in self.sessions.values() if s.get("peer") is not None)
return {
"total": total,
"welcomed": welcomed,
"identified": identified,
"indexed_by_hash": len(self._index_by_hash),
"indexed_by_nick": len(self._index_by_nick),
}
def send_welcome(
self,
link: RNS.Link,
outgoing: list[tuple[RNS.Link, bytes]],
*,
peer_hash: bytes,
old_nick: str | None = None,
new_nick: str | None = None,
) -> None:
"""
Send WELCOME message to a client and optionally MOTD.
This handles:
- Setting session as welcomed
- Updating nick index if needed
- Queueing WELCOME message
- Setting up MOTD callback for post-send delivery
Must be called with state lock held.
"""
from .constants import RES_KIND_MOTD, T_NOTICE
sess = self.sessions.get(link)
if sess is None:
return
if old_nick != new_nick:
self.update_nick_index(link, old_nick, new_nick)
sess["welcomed"] = True
self.hub.message_helper.queue_welcome(
outgoing,
link,
peer_hash=peer_hash,
motd=self.hub.config.greeting,
)
if self.hub.config.greeting:
def send_motd():
self.hub.message_helper.send_text_smart(
link,
msg_type=T_NOTICE,
text=self.hub.config.greeting,
room=None,
kind=RES_KIND_MOTD,
)
if not hasattr(outgoing, "_post_send_callbacks"):
outgoing._post_send_callbacks = [] # type: ignore
outgoing._post_send_callbacks.append(send_motd) # type: ignore
+158
View File
@@ -0,0 +1,158 @@
"""Statistics tracking and reporting for the RRC hub."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .service import HubService
class StatsManager:
"""
Manages hub statistics collection and reporting.
Tracks counters for:
- Bytes in/out
- Packets processed
- Rate limiting events
- Errors sent
- Room joins/parts
- Messages forwarded
- Ping/pong activity
- Announces
- Resource transfers
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = hub.log
self.started_wall_time: float | None = None
self.started_monotonic: float | None = None
self._counters: dict[str, int] = {
"bytes_in": 0,
"bytes_out": 0,
"pkts_in": 0,
"pkts_bad": 0,
"rate_limited": 0,
"errors_sent": 0,
"joins": 0,
"parts": 0,
"msgs_forwarded": 0,
"notices_forwarded": 0,
"pings_in": 0,
"pongs_in": 0,
"pings_out": 0,
"pongs_out": 0,
"announces": 0,
"resources_sent": 0,
"resources_received": 0,
"resources_rejected": 0,
"resource_bytes_sent": 0,
"resource_bytes_received": 0,
}
def set_start_time(self) -> None:
"""Set the start time for uptime calculations."""
self.started_wall_time = time.time()
self.started_monotonic = time.monotonic()
def inc(self, key: str, delta: int = 1) -> None:
"""Increment a counter by the given delta."""
try:
with self.hub._state_lock:
self._counters[key] = int(self._counters.get(key, 0)) + int(delta)
except Exception:
pass
def format_stats(self) -> str:
"""Format current statistics as a human-readable string."""
from . import __version__
now_mono = time.monotonic()
started_mono = self.started_monotonic
uptime_s = (now_mono - started_mono) if started_mono is not None else 0.0
with self.hub._state_lock:
session_stats = self.hub.session_manager.get_stats()
sessions_total = session_stats["total"]
sessions_welcomed = session_stats["welcomed"]
sessions_identified = session_stats["identified"]
room_stats = self.hub.room_manager.get_stats()
rooms_total = room_stats["rooms_total"]
memberships = room_stats["memberships"]
top_rooms = room_stats["top_rooms"]
trust_stats = self.hub.trust_manager.get_stats()
trusted_count = trust_stats["trusted_count"]
banned_count = trust_stats["banned_count"]
c = dict(self._counters)
lines: list[str] = []
lines.append(f"rrcd {__version__} stats")
lines.append(f"uptime_s={uptime_s:.1f}")
lines.append(
f"clients_total={sessions_total} "
f"clients_identified={sessions_identified} "
f"clients_welcomed={sessions_welcomed}"
)
lines.append(f"rooms={rooms_total} memberships={memberships}")
if top_rooms:
lines.append("top_rooms=" + ", ".join(f"{r}:{n}" for r, n in top_rooms))
lines.append(f"trust: trusted={trusted_count} banned={banned_count}")
lines.append(
f"limits: rate_limit_msgs_per_minute={self.hub.config.rate_limit_msgs_per_minute} "
f"max_rooms_per_session={self.hub.config.max_rooms_per_session} "
f"max_room_name_len={self.hub.config.max_room_name_len} "
f"nick_max_chars={self.hub.config.nick_max_chars}"
)
lines.append(
f"features: ping_interval_s={self.hub.config.ping_interval_s} "
f"ping_timeout_s={self.hub.config.ping_timeout_s} "
f"announce_on_start={self.hub.config.announce_on_start} "
f"announce_period_s={self.hub.config.announce_period_s}"
)
lines.append(
"io: pkts_in={} pkts_bad={} bytes_in={} bytes_out={}".format(
c.get("pkts_in", 0),
c.get("pkts_bad", 0),
c.get("bytes_in", 0),
c.get("bytes_out", 0),
)
)
lines.append(
"events: joins={} parts={} msgs_fwd={} notices_fwd={} errors_sent={} rate_limited={}".format(
c.get("joins", 0),
c.get("parts", 0),
c.get("msgs_forwarded", 0),
c.get("notices_forwarded", 0),
c.get("errors_sent", 0),
c.get("rate_limited", 0),
)
)
lines.append(
"pings: in={} out={} pongs: in={} out={}".format(
c.get("pings_in", 0),
c.get("pings_out", 0),
c.get("pongs_in", 0),
c.get("pongs_out", 0),
)
)
lines.append(
"resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format(
c.get("resources_sent", 0),
c.get("resources_received", 0),
c.get("resources_rejected", 0),
c.get("resource_bytes_sent", 0),
c.get("resource_bytes_received", 0),
)
)
return "".join(lines)
+181
View File
@@ -0,0 +1,181 @@
"""Trust and ban management for the RRC hub."""
from __future__ import annotations
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import RNS
from .service import HubService
class TrustManager:
"""
Manages trusted and banned identities for the hub.
Handles:
- Trusted identity lists (server operators)
- Banned identity lists
- Persistence of ban list to config
- Trust/ban checks
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = hub.log
self._trusted: set[bytes] = set()
self._banned: set[bytes] = set()
def load_from_config(
self, trusted_list: list[str] | None, banned_list: list[str] | None
) -> None:
"""Load trusted and banned identities from config lists."""
self._trusted = {
self.hub._parse_identity_hash(h)
for h in (trusted_list or ())
if str(h).strip()
}
self._banned = {
self.hub._parse_identity_hash(h)
for h in (banned_list or ())
if str(h).strip()
}
def is_trusted(self, peer_hash: bytes | None) -> bool:
"""Check if a peer identity is in the trusted list."""
if not peer_hash:
return False
with self.hub._state_lock:
return peer_hash in self._trusted
def is_server_op(self, peer_hash: bytes | None) -> bool:
"""Check if a peer is a server operator (currently same as trusted)."""
return self.is_trusted(peer_hash)
def is_banned(self, peer_hash: bytes | None) -> bool:
"""Check if a peer identity is in the banned list."""
if not peer_hash:
return False
with self.hub._state_lock:
return peer_hash in self._banned
def add_ban(self, peer_hash: bytes) -> None:
"""Add a peer identity to the banned list."""
with self.hub._state_lock:
self._banned.add(peer_hash)
def remove_ban(self, peer_hash: bytes) -> None:
"""Remove a peer identity from the banned list."""
with self.hub._state_lock:
self._banned.discard(peer_hash)
def get_stats(self) -> dict[str, int]:
"""Get statistics about trusted and banned identities."""
with self.hub._state_lock:
return {
"trusted_count": len(self._trusted),
"banned_count": len(self._banned),
}
def update_from_config(
self, trusted_list: list[str] | None, banned_list: list[str] | None
) -> tuple[set[bytes], set[bytes]]:
"""
Update trusted and banned lists from config.
Returns the old (trusted, banned) sets for comparison.
"""
with self.hub._state_lock:
old_trusted = set(self._trusted)
old_banned = set(self._banned)
new_trusted = {
self.hub._parse_identity_hash(h)
for h in (trusted_list or ())
if str(h).strip()
}
new_banned = {
self.hub._parse_identity_hash(h)
for h in (banned_list or ())
if str(h).strip()
}
with self.hub._state_lock:
self._trusted = new_trusted
self._banned = new_banned
return old_trusted, old_banned
def persist_banned_identities_to_config(
self,
link: RNS.Link,
room: str | None,
outgoing: list[tuple[RNS.Link, bytes]] | None = None,
) -> None:
"""Persist the current banned identities list to the config file."""
cfg_path = self.hub.config_manager.get_config_path_for_writes()
if not cfg_path:
self.hub.message_helper.emit_notice(
outgoing, link, room, "ban updated (not persisted; no config_path)"
)
return
try:
from tomlkit import dumps, parse, table # type: ignore
except Exception:
self.hub.message_helper.emit_notice(
outgoing,
link,
room,
"ban updated (not persisted; missing dependency tomlkit)",
)
return
try:
with self.hub.config_manager.get_write_lock():
st = None
try:
st = os.stat(cfg_path)
except Exception:
st = None
with open(cfg_path, encoding="utf-8") as f:
doc = parse(f.read())
hub = doc.get("hub")
if hub is None:
hub = table()
doc["hub"] = hub
existing = hub.get("banned_identities")
existing_list: list[str] = []
if isinstance(existing, list):
for x in existing:
if x is None:
continue
sx = str(x).strip().lower()
if sx.startswith("0x"):
sx = sx[2:]
if sx:
existing_list.append(sx)
with self.hub._state_lock:
merged = set(existing_list)
merged.update(h.hex() for h in sorted(self._banned))
hub["banned_identities"] = sorted(merged)
new_text = dumps(doc)
with open(cfg_path, "w", encoding="utf-8") as f:
f.write(new_text)
if st is not None:
try:
os.chmod(cfg_path, st.st_mode)
except Exception:
pass
except Exception as e:
self.hub.message_helper.emit_notice(
outgoing, link, room, f"ban updated (persist failed: {e})"
)
-2
View File
@@ -25,8 +25,6 @@ def normalize_nick(value, *, max_chars: int = _DEFAULT_NICK_MAX_CHARS) -> str |
if limit > 0 and len(s) > limit:
return None
# Keep this conservative: avoid embedded newlines or NUL, which frequently
# cause UI/log formatting issues.
if "\n" in s or "\r" in s or "\x00" in s:
return None