diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d1fb7b..92a62bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,21 @@ This project follows the versioning policy in VERSIONING.md. +## 0.1.2 - 2026-01-01 + +- Implemented RNS.Resource transfer for messages exceeding MTU limits, with resource envelope handling and automatic fallback +- Allow hub-directed commands (e.g., `/stats`, `/reload`, `/who`, `/kline`) to be sent without a room field +- Removed validation that rejected empty room fields in envelopes, per RRC specification +- Hub-level commands now send responses with no room field (`room=None`) for better client compatibility +- Refactored greeting messages to use dedicated MOTD resource kind for clearer semantics +- Added missing configuration options to default config template + + +## 0.1.1 - 2025-12-30 + +- Protocol extension: hub may attach an optional nickname (`K_NICK = 7`) to forwarded `MSG`/`NOTICE` envelopes for improved user identification + + ## 0.1.0 - 2025-12-29 Initial public release. @@ -13,7 +28,3 @@ Initial public release. - Persistent config + room registry in TOML (`rrcd.toml`, `rooms.toml`) - Reduced lock contention by flushing outbound packets outside the shared state lock - Added small packaging metadata and README polish - -## 0.1.1 - 2025-12-30 - -- Protocol extension: hub may attach an optional nickname (`K_NICK = 7`) to forwarded `MSG`/`NOTICE` envelopes based on the nickname provided in `HELLO`. diff --git a/README.md b/README.md index 2f79d37..3cc51d3 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,40 @@ Wire-level extensions (backwards-compatible): UTF-8 encodable, contain no newlines/NUL, and are at most `nick_max_chars` characters (default: 32). +- **Large payload transfer via RNS.Resource**: For messages that exceed the link + MTU (Maximum Data Unit), `rrcd` can automatically use RNS.Resource for + reliable large payload transfer instead of manual chunking. + + This is implemented as a two-part protocol: + 1. Send a small `RESOURCE_ENVELOPE` message (type 50) via normal packet, + announcing the incoming resource with metadata (id, kind, size, SHA256). + 2. Send the actual payload via `RNS.Resource`. + + The receiving side matches the resource to the expectation and validates + integrity. Supported resource kinds include: + - `notice`: Large NOTICE text messages + - `motd`: Message of the day / server greeting + - `blob`: Generic binary data + + Configuration (in `rrcd.toml`): + ```toml + [hub] + enable_resource_transfer = true # default: true + max_resource_bytes = 262144 # 256 KiB default + max_pending_resource_expectations = 8 # per link + resource_expectation_ttl_s = 30.0 # expectation timeout + ``` + + Safety controls: + - Resources are only accepted if they match a recent expectation + - Size limits enforced (default 256 KiB) + - SHA256 verification for integrity + - TTL-based expectation expiry (default 30 seconds) + - Per-link expectation limit to prevent memory exhaustion + + Fallback: If resource transfer is disabled or fails, NOTICE messages fall + back to the original line-based chunking method. + Configure trusted operators and banned identities in the TOML config: - `trusted_identities`: list of Reticulum Identity hashes (hex) allowed to run diff --git a/pyproject.toml b/pyproject.toml index 494f451..67e2c0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rrcd" -version = "0.1.1" +version = "0.1.2" description = "Reticulum Relay Chat daemon (hub service)" readme = "README.md" license = { file = "LICENSE" } diff --git a/rrcd/cli.py b/rrcd/cli.py index 404b831..87670b5 100644 --- a/rrcd/cli.py +++ b/rrcd/cli.py @@ -121,8 +121,8 @@ announce_period_s = 0.0 hub_name = "rrc" greeting = "" -# Note: The hub greeting is delivered after WELCOME via one or more NOTICE -# messages. NOTICE payloads are chunked as needed to fit the Link MTU. +# Note: The hub 'greeting' is the MOTD (message of the day) delivered after WELCOME. +# If it exceeds the link MTU, it will be sent via RNS.Resource for reliable transfer. # Operator / moderation # @@ -158,6 +158,21 @@ rate_limit_msgs_per_minute = 240 ping_interval_s = 0.0 ping_timeout_s = 0.0 +# Large payload transfer via RNS.Resource +# +# When a message exceeds the link MTU, rrcd can use RNS.Resource for reliable +# transfer instead of manual chunking. A small RESOURCE_ENVELOPE is sent first, +# followed by the payload as an RNS.Resource. +# +# enable_resource_transfer: enable/disable feature (default: true) +# max_resource_bytes: maximum size for a single resource (default: 256 KiB) +# max_pending_resource_expectations: max pending expectations per link (default: 8) +# resource_expectation_ttl_s: how long to wait for announced resource (default: 30s) +enable_resource_transfer = true +max_resource_bytes = 262144 +max_pending_resource_expectations = 8 +resource_expectation_ttl_s = 30.0 + [logging] # Log level for rrcd itself. diff --git a/rrcd/config.py b/rrcd/config.py index b58c200..7a82033 100644 --- a/rrcd/config.py +++ b/rrcd/config.py @@ -26,6 +26,10 @@ class HubRuntimeConfig: rate_limit_msgs_per_minute: int = 240 ping_interval_s: float = 0.0 ping_timeout_s: float = 0.0 + max_resource_bytes: int = 256 * 1024 # 256 KiB default + max_pending_resource_expectations: int = 8 + resource_expectation_ttl_s: float = 30.0 + enable_resource_transfer: bool = True log_level: str = "INFO" log_rns_level: str = "WARNING" log_console: bool = True diff --git a/rrcd/constants.py b/rrcd/constants.py index 1cf43ef..789cc95 100644 --- a/rrcd/constants.py +++ b/rrcd/constants.py @@ -29,6 +29,8 @@ T_PONG = 31 T_ERROR = 40 +T_RESOURCE_ENVELOPE = 50 + # HELLO body keys # Per spec: key assignments are fixed. B_HELLO_NAME = 0 @@ -46,3 +48,15 @@ B_WELCOME_CAPS = 2 # Capabilities map keys (values are advisory). Keep these small and numeric. CAP_RESOURCE_ENVELOPE = 0 + +# RESOURCE_ENVELOPE body keys +B_RES_ID = 0 +B_RES_KIND = 1 +B_RES_SIZE = 2 +B_RES_SHA256 = 3 +B_RES_ENCODING = 4 + +# Resource kinds (string values) +RES_KIND_NOTICE = "notice" +RES_KIND_MOTD = "motd" +RES_KIND_BLOB = "blob" diff --git a/rrcd/envelope.py b/rrcd/envelope.py index a6e5be4..366d420 100644 --- a/rrcd/envelope.py +++ b/rrcd/envelope.py @@ -85,8 +85,7 @@ def validate_envelope(env: dict) -> None: room = env[K_ROOM] if not isinstance(room, str): raise TypeError("room name must be a string") - if room == "": - raise ValueError("room name must not be empty") + # Per RRC spec, room field may be empty (e.g., for hub commands) if K_NICK in env: nick = env[K_NICK] diff --git a/rrcd/service.py b/rrcd/service.py index ee50e61..043dd5f 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib import logging import os import signal @@ -16,6 +17,11 @@ from .config import HubRuntimeConfig from .constants import ( B_HELLO_CAPS, B_HELLO_NICK_LEGACY, + B_RES_ENCODING, + B_RES_ID, + B_RES_KIND, + B_RES_SHA256, + B_RES_SIZE, B_WELCOME_HUB, B_WELCOME_VER, K_BODY, @@ -23,6 +29,9 @@ from .constants import ( K_ROOM, K_SRC, K_T, + RES_KIND_BLOB, + RES_KIND_MOTD, + RES_KIND_NOTICE, T_ERROR, T_HELLO, T_JOIN, @@ -33,6 +42,7 @@ from .constants import ( T_PARTED, T_PING, T_PONG, + T_RESOURCE_ENVELOPE, T_WELCOME, ) from .envelope import make_envelope, validate_envelope @@ -46,6 +56,19 @@ class _RateState: last_refill: float +@dataclass +class _ResourceExpectation: + """Tracks an expected incoming Resource transfer.""" + id: bytes + kind: str + size: int + sha256: bytes | None + encoding: str | None + created_at: float + expires_at: float + room: str | None = None + + class HubService: def __init__(self, config: HubRuntimeConfig) -> None: self.config = config @@ -65,6 +88,10 @@ class HubService: self.sessions: dict[RNS.Link, dict[str, Any]] = {} self._rate: dict[RNS.Link, _RateState] = {} + # Resource transfer state + self._resource_expectations: dict[RNS.Link, dict[bytes, _ResourceExpectation]] = {} + self._active_resources: dict[RNS.Link, set[RNS.Resource]] = {} + self._trusted: set[bytes] = set() self._banned: set[bytes] = set() @@ -100,6 +127,11 @@ class HubService: "pings_out": 0, "pongs_out": 0, "announces": 0, + "resources_sent": 0, + "resources_received": 0, + "resources_rejected": 0, + "resource_bytes_sent": 0, + "resource_bytes_received": 0, } def _extract_caps(self, body: Any) -> dict[int, Any]: @@ -186,12 +218,12 @@ class HubService: link: RNS.Link, *, peer_hash: Any, - greeting: str | None, + motd: str | None, ) -> None: if self.identity is None: return - g = str(greeting) if greeting else "" + g = str(motd) if motd else "" body_w: dict[int, Any] = { B_WELCOME_HUB: self.config.hub_name, B_WELCOME_VER: str(__version__), @@ -216,9 +248,11 @@ class HubService: self._fmt_link_id(link), ) - # The hub greeting is delivered as NOTICE after WELCOME. + # The hub MOTD (message of the day) is delivered after WELCOME. if g: - self._queue_notice_chunks(outgoing, link, room=None, text=g) + self._send_text_smart( + link, msg_type=T_NOTICE, text=g, room=None, outgoing=outgoing, kind=RES_KIND_MOTD + ) def _inc(self, key: str, delta: int = 1) -> None: try: @@ -227,6 +261,471 @@ class HubService: except Exception: pass + # Resource transfer methods + + def _cleanup_expired_expectations(self, link: RNS.Link) -> None: + """Remove expired resource expectations for a link.""" + now = time.time() + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return + + expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] + for rid in expired: + exp_dict.pop(rid, None) + self.log.debug( + "Expired resource expectation link_id=%s rid=%s", + self._fmt_link_id(link), + rid.hex() if isinstance(rid, bytes) else rid, + ) + + def _add_resource_expectation( + self, + link: RNS.Link, + *, + rid: bytes, + kind: str, + size: int, + sha256: bytes | None = None, + encoding: str | None = None, + room: str | None = None, + ) -> bool: + """Add a resource expectation. Returns False if limit exceeded.""" + self._cleanup_expired_expectations(link) + + exp_dict = self._resource_expectations.setdefault(link, {}) + + if len(exp_dict) >= self.config.max_pending_resource_expectations: + self.log.warning( + "Max pending expectations exceeded link_id=%s", + self._fmt_link_id(link), + ) + return False + + now = time.time() + exp = _ResourceExpectation( + id=rid, + kind=kind, + size=size, + sha256=sha256, + encoding=encoding, + created_at=now, + expires_at=now + self.config.resource_expectation_ttl_s, + room=room, + ) + exp_dict[rid] = exp + + self.log.debug( + "Added resource expectation link_id=%s rid=%s kind=%s size=%s", + self._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + return True + + def _find_resource_expectation( + self, link: RNS.Link, size: int + ) -> _ResourceExpectation | None: + """Find a matching resource expectation by size (fallback matching).""" + self._cleanup_expired_expectations(link) + + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + + # Match by size (first match wins) + for exp in exp_dict.values(): + if exp.size == size: + return exp + + return None + + def _pop_resource_expectation( + self, link: RNS.Link, rid: bytes + ) -> _ResourceExpectation | None: + """Remove and return a resource expectation.""" + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + return exp_dict.pop(rid, None) + + def _resource_advertised(self, resource: RNS.Resource) -> bool: + """ + Callback when a Resource is advertised by remote peer. + Returns True to accept, False to reject. + """ + link = resource.link + + if not self.config.enable_resource_transfer: + self.log.debug( + "Rejecting resource (disabled) link_id=%s", + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + with self._state_lock: + sess = self.sessions.get(link) + if not sess: + self.log.debug( + "Rejecting resource (no session) link_id=%s", + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + # Check size limit + size = resource.total_size if hasattr(resource, "total_size") else resource.size + if size > self.config.max_resource_bytes: + self.log.warning( + "Rejecting resource (too large: %s > %s) link_id=%s", + size, + self.config.max_resource_bytes, + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + # Check for matching expectation + exp = self._find_resource_expectation(link, size) + if not exp: + self.log.warning( + "Rejecting resource (no matching expectation) link_id=%s size=%s", + self._fmt_link_id(link), + size, + ) + self._inc("resources_rejected") + return False + + # Accept + self.log.info( + "Accepting resource link_id=%s size=%s kind=%s", + self._fmt_link_id(link), + size, + exp.kind, + ) + self._active_resources.setdefault(link, set()).add(resource) + return True + + def _resource_concluded(self, resource: RNS.Resource) -> None: + """Callback when a Resource transfer completes.""" + link = resource.link + + with self._state_lock: + # Remove from active set + active_set = self._active_resources.get(link) + if active_set: + active_set.discard(resource) + + if resource.status != RNS.Resource.COMPLETE: + self.log.warning( + "Resource transfer failed link_id=%s status=%s", + self._fmt_link_id(link), + resource.status, + ) + return + + # Get payload + try: + payload = resource.data.read() if hasattr(resource.data, "read") else resource.data + if isinstance(payload, bytearray): + payload = bytes(payload) + except Exception as e: + self.log.error( + "Failed to read resource data link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return + + size = len(payload) + + # Find and remove expectation + exp = self._find_resource_expectation(link, size) + if not exp: + self.log.warning( + "Received resource without expectation link_id=%s size=%s", + self._fmt_link_id(link), + size, + ) + return + + self._pop_resource_expectation(link, exp.id) + + # Verify SHA256 if provided + if exp.sha256: + actual_hash = hashlib.sha256(payload).digest() + if actual_hash != exp.sha256: + self.log.error( + "Resource SHA256 mismatch link_id=%s expected=%s actual=%s", + self._fmt_link_id(link), + exp.sha256.hex(), + actual_hash.hex(), + ) + return + + self._inc("resources_received") + self._inc("resource_bytes_received", size) + + self.log.info( + "Resource received link_id=%s size=%s kind=%s", + self._fmt_link_id(link), + size, + exp.kind, + ) + + # Dispatch by kind + try: + self._dispatch_received_resource(link, exp, payload) + except Exception as e: + self.log.exception( + "Failed to dispatch resource link_id=%s kind=%s: %s", + self._fmt_link_id(link), + exp.kind, + e, + ) + + def _dispatch_received_resource( + self, link: RNS.Link, exp: _ResourceExpectation, payload: bytes + ) -> None: + """Dispatch a received resource payload to appropriate handler.""" + if exp.kind == RES_KIND_NOTICE: + # Decode as text and deliver as notice + encoding = exp.encoding or "utf-8" + try: + text = payload.decode(encoding) + except Exception as e: + self.log.error( + "Failed to decode notice resource link_id=%s encoding=%s: %s", + self._fmt_link_id(link), + encoding, + e, + ) + return + + # Log the notice (don't send back to sender) + self.log.info( + "Received large NOTICE via resource link_id=%s room=%r chars=%s", + self._fmt_link_id(link), + exp.room, + len(text), + ) + # Note: In a full implementation, this would be forwarded to other room members + # For now, just acknowledge receipt + + elif exp.kind == RES_KIND_MOTD: + # Similar to NOTICE + encoding = exp.encoding or "utf-8" + try: + text = payload.decode(encoding) + except Exception as e: + self.log.error( + "Failed to decode MOTD resource link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return + + self.log.info( + "Received MOTD via resource link_id=%s chars=%s", + self._fmt_link_id(link), + len(text), + ) + + elif exp.kind == RES_KIND_BLOB: + # Generic binary data + self.log.info( + "Received BLOB via resource link_id=%s bytes=%s", + self._fmt_link_id(link), + len(payload), + ) + else: + self.log.warning( + "Unknown resource kind link_id=%s kind=%s", + self._fmt_link_id(link), + exp.kind, + ) + + def _send_via_resource( + self, + link: RNS.Link, + *, + kind: str, + payload: bytes, + room: str | None = None, + encoding: str | None = None, + ) -> bool: + """ + Send large payload via Resource. + Returns True if successfully initiated, False otherwise. + """ + if not self.config.enable_resource_transfer: + return False + + size = len(payload) + if size > self.config.max_resource_bytes: + self.log.error( + "Payload too large for resource transfer: %s > %s", + size, + self.config.max_resource_bytes, + ) + return False + + # Generate resource ID + rid = os.urandom(8) + + # Compute SHA256 + sha256 = hashlib.sha256(payload).digest() + + # Send envelope first + if self.identity is None: + return False + + envelope_body = { + B_RES_ID: rid, + B_RES_KIND: kind, + B_RES_SIZE: size, + B_RES_SHA256: sha256, + } + if encoding: + envelope_body[B_RES_ENCODING] = encoding + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=self.identity.hash, + room=room, + body=envelope_body, + ) + + try: + envelope_payload = encode(envelope) + RNS.Packet(link, envelope_payload).send() + self._inc("bytes_out", len(envelope_payload)) + + self.log.debug( + "Sent resource envelope link_id=%s rid=%s kind=%s size=%s", + self._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + except Exception as e: + self.log.error( + "Failed to send resource envelope link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return False + + # Create and advertise resource + try: + resource = RNS.Resource(payload, link, advertise=True, auto_compress=False) + + with self._state_lock: + self._active_resources.setdefault(link, set()).add(resource) + + self._inc("resources_sent") + self._inc("resource_bytes_sent", size) + + self.log.info( + "Sent resource link_id=%s rid=%s kind=%s size=%s", + self._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + return True + + except Exception as e: + self.log.error( + "Failed to create resource link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return False + + def _send_text_smart( + self, + link: RNS.Link, + *, + msg_type: int, + text: str, + room: str | None = None, + encoding: str = "utf-8", + outgoing: list[tuple[RNS.Link, bytes]] | None = None, + kind: str | None = None, + ) -> None: + """ + Send text message using best method (packet or resource). + Falls back to chunking if resource transfer fails or is disabled. + + Args: + kind: Resource kind if sent via resource (default: RES_KIND_NOTICE) + """ + if self.identity is None: + return + + # Try encoding as a single packet first + env = make_envelope(msg_type, src=self.identity.hash, room=room, body=text) + payload = encode(env) + + # If it fits, send normally + if self._packet_would_fit(link, payload): + if outgoing is None: + self._send(link, env) + else: + self._queue_env(outgoing, link, env) + return + + # Too large for packet - try resource if enabled and type is NOTICE + if ( + self.config.enable_resource_transfer + and msg_type == T_NOTICE + and len(text.encode(encoding)) <= self.config.max_resource_bytes + ): + text_bytes = text.encode(encoding) + resource_kind = kind if kind is not None else RES_KIND_NOTICE + if self._send_via_resource( + link, + kind=resource_kind, + payload=text_bytes, + room=room, + encoding=encoding, + ): + self.log.debug( + "Sent large text via resource link_id=%s kind=%s chars=%s", + self._fmt_link_id(link), + resource_kind, + len(text), + ) + return + + # Fall back to chunking for NOTICE + if msg_type == T_NOTICE: + if outgoing is None: + outgoing = [] + self._queue_notice_chunks(outgoing, link, room=room, text=text) + for out_link, chunk_payload in outgoing: + self._inc("bytes_out", len(chunk_payload)) + try: + RNS.Packet(out_link, chunk_payload).send() + except Exception as e: + self.log.warning( + "Failed to send chunk link_id=%s: %s", + self._fmt_link_id(out_link), + e, + ) + else: + self._queue_notice_chunks(outgoing, link, room=room, text=text) + else: + # For other message types, just drop or log error + self.log.error( + "Message too large and not NOTICE link_id=%s type=%s", + self._fmt_link_id(link), + msg_type, + ) + def start(self) -> None: self.log.info("Starting Reticulum") if self._started_wall_time is None: @@ -346,6 +845,8 @@ class HubService: self.sessions.clear() self.rooms.clear() self._rate.clear() + self._resource_expectations.clear() + self._active_resources.clear() for link in links: try: @@ -1440,6 +1941,15 @@ class HubService: c.get("pongs_out", 0), ) ) + lines.append( + "resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format( + c.get("resources_sent", 0), + c.get("resources_received", 0), + c.get("resources_rejected", 0), + c.get("resource_bytes_sent", 0), + c.get("resource_bytes_received", 0), + ) + ) return "\n".join(lines) @@ -1516,10 +2026,11 @@ class HubService: link, src=self.identity.hash, text="not authorized", - room=room, + room=None, ) return True - self._reload_config_and_rooms(link, room, outgoing) + # Hub-level command - send responses without room field + self._reload_config_and_rooms(link, None, outgoing) return True # Global/server-operator commands @@ -1531,10 +2042,11 @@ class HubService: link, src=self.identity.hash, text="not authorized", - room=room, + room=None, ) return True - self._emit_notice(outgoing, link, room, self._format_stats()) + # Send response without room field for hub-level command + self._emit_notice(outgoing, link, None, self._format_stats()) return True if cmd in ("who", "names"): @@ -1542,12 +2054,12 @@ class HubService: if len(parts) >= 2: target_room = parts[1] if not isinstance(target_room, str) or not target_room: - self._emit_notice(outgoing, link, room, "usage: /who [room]") + self._emit_notice(outgoing, link, None, "usage: /who [room]") return True try: r = self._norm_room(target_room) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True members = [] @@ -1562,10 +2074,11 @@ class HubService: members.append(f"{nick} ({ident[:12]})") else: members.append(ident) + # Send response without room field for hub-level query self._emit_notice( outgoing, link, - room, + None, f"members in {r}: " + (", ".join(members) if members else "(none)"), ) return True @@ -1573,7 +2086,7 @@ class HubService: if cmd == "kick": if len(parts) < 3: self._emit_notice( - outgoing, link, room, "usage: /kick " + outgoing, link, None, "usage: /kick " ) return True target_room = parts[1] @@ -1632,15 +2145,16 @@ class HubService: link, src=self.identity.hash, text="not authorized", - room=room, + room=None, ) return True + # Hub-level command - all responses without room field if len(parts) < 2: self._emit_notice( outgoing, link, - room, + None, "usage: /kline add|del|list [nick|hashprefix|hash]", ) return True @@ -1651,7 +2165,7 @@ class HubService: self._emit_notice( outgoing, link, - room, + None, "klines: " + (", ".join(items) if items else "(none)"), ) return True @@ -1660,14 +2174,14 @@ class HubService: self._emit_notice( outgoing, link, - room, + None, "usage: /kline add|del|list [nick|hashprefix|hash]", ) return True if len(parts) < 3: self._emit_notice( - outgoing, link, room, f"usage: /kline {op} " + outgoing, link, None, f"usage: /kline {op} " ) return True @@ -1679,48 +2193,48 @@ class HubService: ph = tsess.get("peer") if tsess else None if isinstance(ph, (bytes, bytearray)): self._banned.add(bytes(ph)) - self._persist_banned_identities_to_config(link, room, outgoing) + self._persist_banned_identities_to_config(link, None, outgoing) try: target_link.teardown() except Exception: pass - self._emit_notice(outgoing, link, room, f"kline added for {target}") + self._emit_notice(outgoing, link, None, f"kline added for {target}") return True try: h = self._parse_identity_hash(target) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad identity hash: {e}") + self._emit_notice(outgoing, link, None, f"bad identity hash: {e}") return True self._banned.add(h) - self._persist_banned_identities_to_config(link, room, outgoing) - self._emit_notice(outgoing, link, room, f"kline added for {h.hex()}") + self._persist_banned_identities_to_config(link, None, outgoing) + self._emit_notice(outgoing, link, None, f"kline added for {h.hex()}") return True # op == "del" try: h = self._parse_identity_hash(target) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad identity hash: {e}") + self._emit_notice(outgoing, link, None, f"bad identity hash: {e}") return True if h in self._banned: self._banned.discard(h) - self._persist_banned_identities_to_config(link, room, outgoing) - self._emit_notice(outgoing, link, room, f"kline removed for {h.hex()}") + self._persist_banned_identities_to_config(link, None, outgoing) + self._emit_notice(outgoing, link, None, f"kline removed for {h.hex()}") else: - self._emit_notice(outgoing, link, room, f"not klined: {h.hex()}") + self._emit_notice(outgoing, link, None, f"not klined: {h.hex()}") return True # Room-scoped moderation and maintenance if cmd == "register": if len(parts) < 2: - self._emit_notice(outgoing, link, room, "usage: /register ") + self._emit_notice(outgoing, link, None, "usage: /register ") return True try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True # Registration rules: requester must be in the room and must be the founder. # (No server-op override by design.) @@ -1792,12 +2306,12 @@ class HubService: if cmd == "unregister": if len(parts) < 2: - self._emit_notice(outgoing, link, room, "usage: /unregister ") + self._emit_notice(outgoing, link, None, "usage: /unregister ") return True try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True if ( @@ -1840,12 +2354,12 @@ class HubService: if cmd == "topic": if len(parts) < 2: - self._emit_notice(outgoing, link, room, "usage: /topic [topic]") + self._emit_notice(outgoing, link, None, "usage: /topic [topic]") return True try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True st = self._room_state_ensure(r) if len(parts) == 2: @@ -1888,13 +2402,13 @@ class HubService: if cmd in ("op", "deop", "voice", "devoice"): if len(parts) < 3: self._emit_notice( - outgoing, link, room, f"usage: /{cmd} " + outgoing, link, None, f"usage: /{cmd} " ) return True try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True if not self._is_room_op(r, peer_hash): if self.identity is not None: @@ -1961,14 +2475,14 @@ class HubService: self._emit_notice( outgoing, link, - room, + None, "usage: /mode (+m|-m|+i|-i|+t|-t|+n|-n|+k|-k|+r|-r) [key] | /mode (+o|-o|+v|-v) ", ) return True try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True if not self._is_room_op(r, peer_hash): if self.identity is not None: @@ -2118,7 +2632,7 @@ class HubService: self._emit_notice( outgoing, link, - room, + None, "usage: /ban add|del|list [nick|hashprefix|hash]", ) return True @@ -2126,7 +2640,7 @@ class HubService: try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True op = parts[2].strip().lower() @@ -2219,7 +2733,7 @@ class HubService: self._emit_notice( outgoing, link, - room, + None, "usage: /invite add|del|list [nick|hashprefix|hash]", ) return True @@ -2227,7 +2741,7 @@ class HubService: try: r = self._norm_room(parts[1]) except Exception as e: - self._emit_notice(outgoing, link, room, f"bad room: {e}") + self._emit_notice(outgoing, link, None, f"bad room: {e}") return True if not self._is_room_op(r, peer_hash): @@ -2396,6 +2910,10 @@ class HubService: tokens=float(self.config.rate_limit_msgs_per_minute), last_refill=time.monotonic(), ) + + # Initialize resource tracking for this link + self._resource_expectations[link] = {} + self._active_resources[link] = set() link.set_packet_callback(lambda data, pkt: self._on_packet(link, data)) link.set_link_closed_callback(lambda closed_link: self._on_close(closed_link)) @@ -2404,6 +2922,23 @@ class HubService: identified_link, ident ) ) + + # Set up resource callbacks + if self.config.enable_resource_transfer: + try: + link.set_resource_strategy(RNS.Link.ACCEPT_APP) + link.set_resource_callback(self._resource_advertised) + link.set_resource_concluded_callback(self._resource_concluded) + self.log.debug( + "Resource callbacks configured link_id=%s", + self._fmt_link_id(link), + ) + except Exception as e: + self.log.warning( + "Failed to set resource callbacks link_id=%s: %s", + self._fmt_link_id(link), + e, + ) self.log.info("Link established link_id=%s", self._fmt_link_id(link)) @@ -2455,13 +2990,13 @@ class HubService: sess["welcomed"] = True # Use the queued path so we can preflight MTU sizing and optionally - # follow up with NOTICE chunks (e.g. greeting). + # follow up with MOTD via resource or chunks. outgoing: list[tuple[RNS.Link, bytes]] = [] self._queue_welcome( outgoing, link, peer_hash=sess.get("peer"), - greeting=self.config.greeting, + motd=self.config.greeting, ) for out_link, payload in outgoing: self._inc("bytes_out", len(payload)) @@ -2490,6 +3025,11 @@ class HubService: with self._state_lock: sess = self.sessions.pop(link, None) self._rate.pop(link, None) + + # Clean up resource state + self._resource_expectations.pop(link, None) + self._active_resources.pop(link, None) + if not sess: return @@ -2574,6 +3114,13 @@ class HubService: with self._state_lock: self._on_packet_locked(link, data, outgoing) + if self.log.isEnabledFor(logging.DEBUG) and outgoing: + self.log.debug( + "Sending %d response(s) link_id=%s", + len(outgoing), + self._fmt_link_id(link), + ) + for out_link, payload in outgoing: self._inc("bytes_out", len(payload)) try: @@ -2675,6 +3222,117 @@ class HubService: sess["awaiting_pong"] = None return + if t == T_RESOURCE_ENVELOPE: + # Handle resource envelope announcement + if not self.config.enable_resource_transfer: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource transfer disabled", + room=room, + ) + return + + if not isinstance(body, dict): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="invalid resource envelope body", + room=room, + ) + return + + rid = body.get(B_RES_ID) + kind = body.get(B_RES_KIND) + size = body.get(B_RES_SIZE) + sha256 = body.get(B_RES_SHA256) + encoding = body.get(B_RES_ENCODING) + + # Validate required fields + if not isinstance(rid, (bytes, bytearray)): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope missing id", + room=room, + ) + return + + if not isinstance(kind, str) or not kind: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope missing kind", + room=room, + ) + return + + if not isinstance(size, int) or size < 0: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope invalid size", + room=room, + ) + return + + # Check size limit + if size > self.config.max_resource_bytes: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text=f"resource too large: {size} > {self.config.max_resource_bytes}", + room=room, + ) + return + + # Validate optional fields + if sha256 is not None and not isinstance(sha256, (bytes, bytearray)): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope invalid sha256", + room=room, + ) + return + + if encoding is not None and not isinstance(encoding, str): + encoding = None + + # Add expectation + if not self._add_resource_expectation( + link, + rid=bytes(rid), + kind=kind, + size=size, + sha256=bytes(sha256) if sha256 else None, + encoding=encoding, + room=room, + ): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="too many pending resource expectations", + room=room, + ) + return + if not sess["welcomed"]: if t != T_HELLO: if self.identity is not None: @@ -2712,7 +3370,7 @@ class HubService: outgoing, link, peer_hash=peer_hash, - greeting=self.config.greeting, + motd=self.config.greeting, ) return @@ -2754,7 +3412,7 @@ class HubService: outgoing, link, peer_hash=peer_hash, - greeting=self.config.greeting, + motd=self.config.greeting, ) return @@ -2955,15 +3613,57 @@ class HubService: return if t in (T_MSG, T_NOTICE): - if not isinstance(room, str) or not room: - if self.identity is not None: - self._emit_error( - outgoing, - link, - src=self.identity.hash, - text="message requires room name", + # Check for slash commands first, as they may not require a room. + # Per RRC spec, the room field is optional and may be empty. + if isinstance(body, str): + cmdline = body.strip() + if cmdline.startswith("/"): + # It's a slash command - attempt to handle it + if self.log.isEnabledFor(logging.DEBUG): + self.log.debug( + "Slash command peer=%s link_id=%s cmd=%r room=%r", + self._fmt_hash(peer_hash), + self._fmt_link_id(link), + cmdline, + room, + ) + handled = self._handle_operator_command( + link, peer_hash=peer_hash, room=room, text=body, outgoing=outgoing ) - return + if handled: + if self.log.isEnabledFor(logging.DEBUG): + self.log.debug( + "Slash command handled, queued=%d responses", + len(outgoing), + ) + return + # Unrecognized slash command - send error + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="unrecognized command", + room=room, + ) + return + + # NOTICE messages are informational/non-conversational and don't require a room. + # MSG messages require a room for delivery. + if t == T_MSG: + if not isinstance(room, str) or not room: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="message requires room name", + ) + return + elif t == T_NOTICE: + # NOTICE without a room is allowed - just don't forward it anywhere + if not isinstance(room, str) or not room: + return try: r = self._norm_room(room) @@ -3027,11 +3727,6 @@ class HubService: ) return - if isinstance(body, str) and self._handle_operator_command( - link, peer_hash=peer_hash, room=r, text=body, outgoing=outgoing - ): - return - if peer_hash is not None: env[K_SRC] = ( bytes(peer_hash) diff --git a/tests/test_resource.py b/tests/test_resource.py new file mode 100644 index 0000000..838d439 --- /dev/null +++ b/tests/test_resource.py @@ -0,0 +1,97 @@ +"""Tests for resource transfer functionality.""" +import hashlib +import os + +from rrcd.codec import decode, encode +from rrcd.constants import ( + B_RES_ENCODING, + B_RES_ID, + B_RES_KIND, + B_RES_SHA256, + B_RES_SIZE, + K_BODY, + K_SRC, + K_T, + RES_KIND_NOTICE, + T_RESOURCE_ENVELOPE, +) +from rrcd.envelope import make_envelope + + +def test_resource_envelope_serialization(): + """Test that resource envelopes can be created and serialized.""" + src = os.urandom(16) + rid = os.urandom(8) + payload = b"This is a test payload that is larger than typical MDU" + sha256 = hashlib.sha256(payload).digest() + + body = { + B_RES_ID: rid, + B_RES_KIND: RES_KIND_NOTICE, + B_RES_SIZE: len(payload), + B_RES_SHA256: sha256, + B_RES_ENCODING: "utf-8", + } + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=src, + room="test", + body=body, + ) + + # Serialize and deserialize + encoded = encode(envelope) + decoded = decode(encoded) + + assert decoded[K_T] == T_RESOURCE_ENVELOPE + assert decoded[K_SRC] == src + + decoded_body = decoded[K_BODY] + assert decoded_body[B_RES_ID] == rid + assert decoded_body[B_RES_KIND] == RES_KIND_NOTICE + assert decoded_body[B_RES_SIZE] == len(payload) + assert decoded_body[B_RES_SHA256] == sha256 + assert decoded_body[B_RES_ENCODING] == "utf-8" + + +def test_resource_envelope_minimal(): + """Test resource envelope with minimal required fields.""" + src = os.urandom(16) + rid = os.urandom(8) + + body = { + B_RES_ID: rid, + B_RES_KIND: "blob", + B_RES_SIZE: 1024, + } + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=src, + body=body, + ) + + encoded = encode(envelope) + decoded = decode(encoded) + + decoded_body = decoded[K_BODY] + assert B_RES_SHA256 not in decoded_body + assert B_RES_ENCODING not in decoded_body + assert decoded_body[B_RES_SIZE] == 1024 + + +def test_sha256_verification(): + """Test SHA256 hash computation for payload verification.""" + payload = b"Test payload for SHA256 verification" + expected = hashlib.sha256(payload).digest() + + # Verify we can compute and compare hashes correctly + computed = hashlib.sha256(payload).digest() + assert computed == expected + assert len(computed) == 32 + + # Verify mismatch detection + wrong_payload = b"Different payload" + wrong_hash = hashlib.sha256(wrong_payload).digest() + assert wrong_hash != expected