diff --git a/README.md b/README.md index 2f79d37..3cc51d3 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,40 @@ Wire-level extensions (backwards-compatible): UTF-8 encodable, contain no newlines/NUL, and are at most `nick_max_chars` characters (default: 32). +- **Large payload transfer via RNS.Resource**: For messages that exceed the link + MTU (Maximum Data Unit), `rrcd` can automatically use RNS.Resource for + reliable large payload transfer instead of manual chunking. + + This is implemented as a two-part protocol: + 1. Send a small `RESOURCE_ENVELOPE` message (type 50) via normal packet, + announcing the incoming resource with metadata (id, kind, size, SHA256). + 2. Send the actual payload via `RNS.Resource`. + + The receiving side matches the resource to the expectation and validates + integrity. Supported resource kinds include: + - `notice`: Large NOTICE text messages + - `motd`: Message of the day / server greeting + - `blob`: Generic binary data + + Configuration (in `rrcd.toml`): + ```toml + [hub] + enable_resource_transfer = true # default: true + max_resource_bytes = 262144 # 256 KiB default + max_pending_resource_expectations = 8 # per link + resource_expectation_ttl_s = 30.0 # expectation timeout + ``` + + Safety controls: + - Resources are only accepted if they match a recent expectation + - Size limits enforced (default 256 KiB) + - SHA256 verification for integrity + - TTL-based expectation expiry (default 30 seconds) + - Per-link expectation limit to prevent memory exhaustion + + Fallback: If resource transfer is disabled or fails, NOTICE messages fall + back to the original line-based chunking method. + Configure trusted operators and banned identities in the TOML config: - `trusted_identities`: list of Reticulum Identity hashes (hex) allowed to run diff --git a/rrcd/config.py b/rrcd/config.py index b58c200..7a82033 100644 --- a/rrcd/config.py +++ b/rrcd/config.py @@ -26,6 +26,10 @@ class HubRuntimeConfig: rate_limit_msgs_per_minute: int = 240 ping_interval_s: float = 0.0 ping_timeout_s: float = 0.0 + max_resource_bytes: int = 256 * 1024 # 256 KiB default + max_pending_resource_expectations: int = 8 + resource_expectation_ttl_s: float = 30.0 + enable_resource_transfer: bool = True log_level: str = "INFO" log_rns_level: str = "WARNING" log_console: bool = True diff --git a/rrcd/constants.py b/rrcd/constants.py index 1cf43ef..789cc95 100644 --- a/rrcd/constants.py +++ b/rrcd/constants.py @@ -29,6 +29,8 @@ T_PONG = 31 T_ERROR = 40 +T_RESOURCE_ENVELOPE = 50 + # HELLO body keys # Per spec: key assignments are fixed. B_HELLO_NAME = 0 @@ -46,3 +48,15 @@ B_WELCOME_CAPS = 2 # Capabilities map keys (values are advisory). Keep these small and numeric. CAP_RESOURCE_ENVELOPE = 0 + +# RESOURCE_ENVELOPE body keys +B_RES_ID = 0 +B_RES_KIND = 1 +B_RES_SIZE = 2 +B_RES_SHA256 = 3 +B_RES_ENCODING = 4 + +# Resource kinds (string values) +RES_KIND_NOTICE = "notice" +RES_KIND_MOTD = "motd" +RES_KIND_BLOB = "blob" diff --git a/rrcd/service.py b/rrcd/service.py index ee50e61..0aad43c 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib import logging import os import signal @@ -16,6 +17,11 @@ from .config import HubRuntimeConfig from .constants import ( B_HELLO_CAPS, B_HELLO_NICK_LEGACY, + B_RES_ENCODING, + B_RES_ID, + B_RES_KIND, + B_RES_SHA256, + B_RES_SIZE, B_WELCOME_HUB, B_WELCOME_VER, K_BODY, @@ -23,6 +29,9 @@ from .constants import ( K_ROOM, K_SRC, K_T, + RES_KIND_BLOB, + RES_KIND_MOTD, + RES_KIND_NOTICE, T_ERROR, T_HELLO, T_JOIN, @@ -33,6 +42,7 @@ from .constants import ( T_PARTED, T_PING, T_PONG, + T_RESOURCE_ENVELOPE, T_WELCOME, ) from .envelope import make_envelope, validate_envelope @@ -46,6 +56,19 @@ class _RateState: last_refill: float +@dataclass +class _ResourceExpectation: + """Tracks an expected incoming Resource transfer.""" + id: bytes + kind: str + size: int + sha256: bytes | None + encoding: str | None + created_at: float + expires_at: float + room: str | None = None + + class HubService: def __init__(self, config: HubRuntimeConfig) -> None: self.config = config @@ -65,6 +88,10 @@ class HubService: self.sessions: dict[RNS.Link, dict[str, Any]] = {} self._rate: dict[RNS.Link, _RateState] = {} + # Resource transfer state + self._resource_expectations: dict[RNS.Link, dict[bytes, _ResourceExpectation]] = {} + self._active_resources: dict[RNS.Link, set[RNS.Resource]] = {} + self._trusted: set[bytes] = set() self._banned: set[bytes] = set() @@ -100,6 +127,11 @@ class HubService: "pings_out": 0, "pongs_out": 0, "announces": 0, + "resources_sent": 0, + "resources_received": 0, + "resources_rejected": 0, + "resource_bytes_sent": 0, + "resource_bytes_received": 0, } def _extract_caps(self, body: Any) -> dict[int, Any]: @@ -218,7 +250,9 @@ class HubService: # The hub greeting is delivered as NOTICE after WELCOME. if g: - self._queue_notice_chunks(outgoing, link, room=None, text=g) + self._send_text_smart( + link, msg_type=T_NOTICE, text=g, room=None, outgoing=outgoing + ) def _inc(self, key: str, delta: int = 1) -> None: try: @@ -227,6 +261,465 @@ class HubService: except Exception: pass + # Resource transfer methods + + def _cleanup_expired_expectations(self, link: RNS.Link) -> None: + """Remove expired resource expectations for a link.""" + now = time.time() + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return + + expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] + for rid in expired: + exp_dict.pop(rid, None) + self.log.debug( + "Expired resource expectation link_id=%s rid=%s", + self._fmt_link_id(link), + rid.hex() if isinstance(rid, bytes) else rid, + ) + + def _add_resource_expectation( + self, + link: RNS.Link, + *, + rid: bytes, + kind: str, + size: int, + sha256: bytes | None = None, + encoding: str | None = None, + room: str | None = None, + ) -> bool: + """Add a resource expectation. Returns False if limit exceeded.""" + self._cleanup_expired_expectations(link) + + exp_dict = self._resource_expectations.setdefault(link, {}) + + if len(exp_dict) >= self.config.max_pending_resource_expectations: + self.log.warning( + "Max pending expectations exceeded link_id=%s", + self._fmt_link_id(link), + ) + return False + + now = time.time() + exp = _ResourceExpectation( + id=rid, + kind=kind, + size=size, + sha256=sha256, + encoding=encoding, + created_at=now, + expires_at=now + self.config.resource_expectation_ttl_s, + room=room, + ) + exp_dict[rid] = exp + + self.log.debug( + "Added resource expectation link_id=%s rid=%s kind=%s size=%s", + self._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + return True + + def _find_resource_expectation( + self, link: RNS.Link, size: int + ) -> _ResourceExpectation | None: + """Find a matching resource expectation by size (fallback matching).""" + self._cleanup_expired_expectations(link) + + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + + # Match by size (first match wins) + for exp in exp_dict.values(): + if exp.size == size: + return exp + + return None + + def _pop_resource_expectation( + self, link: RNS.Link, rid: bytes + ) -> _ResourceExpectation | None: + """Remove and return a resource expectation.""" + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + return exp_dict.pop(rid, None) + + def _resource_advertised(self, resource: RNS.Resource) -> bool: + """ + Callback when a Resource is advertised by remote peer. + Returns True to accept, False to reject. + """ + link = resource.link + + if not self.config.enable_resource_transfer: + self.log.debug( + "Rejecting resource (disabled) link_id=%s", + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + with self._state_lock: + sess = self.sessions.get(link) + if not sess: + self.log.debug( + "Rejecting resource (no session) link_id=%s", + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + # Check size limit + size = resource.total_size if hasattr(resource, "total_size") else resource.size + if size > self.config.max_resource_bytes: + self.log.warning( + "Rejecting resource (too large: %s > %s) link_id=%s", + size, + self.config.max_resource_bytes, + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + # Check for matching expectation + exp = self._find_resource_expectation(link, size) + if not exp: + self.log.warning( + "Rejecting resource (no matching expectation) link_id=%s size=%s", + self._fmt_link_id(link), + size, + ) + self._inc("resources_rejected") + return False + + # Accept + self.log.info( + "Accepting resource link_id=%s size=%s kind=%s", + self._fmt_link_id(link), + size, + exp.kind, + ) + self._active_resources.setdefault(link, set()).add(resource) + return True + + def _resource_concluded(self, resource: RNS.Resource) -> None: + """Callback when a Resource transfer completes.""" + link = resource.link + + with self._state_lock: + # Remove from active set + active_set = self._active_resources.get(link) + if active_set: + active_set.discard(resource) + + if resource.status != RNS.Resource.COMPLETE: + self.log.warning( + "Resource transfer failed link_id=%s status=%s", + self._fmt_link_id(link), + resource.status, + ) + return + + # Get payload + try: + payload = resource.data.read() if hasattr(resource.data, "read") else resource.data + if isinstance(payload, bytearray): + payload = bytes(payload) + except Exception as e: + self.log.error( + "Failed to read resource data link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return + + size = len(payload) + + # Find and remove expectation + exp = self._find_resource_expectation(link, size) + if not exp: + self.log.warning( + "Received resource without expectation link_id=%s size=%s", + self._fmt_link_id(link), + size, + ) + return + + self._pop_resource_expectation(link, exp.id) + + # Verify SHA256 if provided + if exp.sha256: + actual_hash = hashlib.sha256(payload).digest() + if actual_hash != exp.sha256: + self.log.error( + "Resource SHA256 mismatch link_id=%s expected=%s actual=%s", + self._fmt_link_id(link), + exp.sha256.hex(), + actual_hash.hex(), + ) + return + + self._inc("resources_received") + self._inc("resource_bytes_received", size) + + self.log.info( + "Resource received link_id=%s size=%s kind=%s", + self._fmt_link_id(link), + size, + exp.kind, + ) + + # Dispatch by kind + try: + self._dispatch_received_resource(link, exp, payload) + except Exception as e: + self.log.exception( + "Failed to dispatch resource link_id=%s kind=%s: %s", + self._fmt_link_id(link), + exp.kind, + e, + ) + + def _dispatch_received_resource( + self, link: RNS.Link, exp: _ResourceExpectation, payload: bytes + ) -> None: + """Dispatch a received resource payload to appropriate handler.""" + if exp.kind == RES_KIND_NOTICE: + # Decode as text and deliver as notice + encoding = exp.encoding or "utf-8" + try: + text = payload.decode(encoding) + except Exception as e: + self.log.error( + "Failed to decode notice resource link_id=%s encoding=%s: %s", + self._fmt_link_id(link), + encoding, + e, + ) + return + + # Log the notice (don't send back to sender) + self.log.info( + "Received large NOTICE via resource link_id=%s room=%r chars=%s", + self._fmt_link_id(link), + exp.room, + len(text), + ) + # Note: In a full implementation, this would be forwarded to other room members + # For now, just acknowledge receipt + + elif exp.kind == RES_KIND_MOTD: + # Similar to NOTICE + encoding = exp.encoding or "utf-8" + try: + text = payload.decode(encoding) + except Exception as e: + self.log.error( + "Failed to decode MOTD resource link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return + + self.log.info( + "Received MOTD via resource link_id=%s chars=%s", + self._fmt_link_id(link), + len(text), + ) + + elif exp.kind == RES_KIND_BLOB: + # Generic binary data + self.log.info( + "Received BLOB via resource link_id=%s bytes=%s", + self._fmt_link_id(link), + len(payload), + ) + else: + self.log.warning( + "Unknown resource kind link_id=%s kind=%s", + self._fmt_link_id(link), + exp.kind, + ) + + def _send_via_resource( + self, + link: RNS.Link, + *, + kind: str, + payload: bytes, + room: str | None = None, + encoding: str | None = None, + ) -> bool: + """ + Send large payload via Resource. + Returns True if successfully initiated, False otherwise. + """ + if not self.config.enable_resource_transfer: + return False + + size = len(payload) + if size > self.config.max_resource_bytes: + self.log.error( + "Payload too large for resource transfer: %s > %s", + size, + self.config.max_resource_bytes, + ) + return False + + # Generate resource ID + rid = os.urandom(8) + + # Compute SHA256 + sha256 = hashlib.sha256(payload).digest() + + # Send envelope first + if self.identity is None: + return False + + envelope_body = { + B_RES_ID: rid, + B_RES_KIND: kind, + B_RES_SIZE: size, + B_RES_SHA256: sha256, + } + if encoding: + envelope_body[B_RES_ENCODING] = encoding + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=self.identity.hash, + room=room, + body=envelope_body, + ) + + try: + envelope_payload = encode(envelope) + RNS.Packet(link, envelope_payload).send() + self._inc("bytes_out", len(envelope_payload)) + + self.log.debug( + "Sent resource envelope link_id=%s rid=%s kind=%s size=%s", + self._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + except Exception as e: + self.log.error( + "Failed to send resource envelope link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return False + + # Create and advertise resource + try: + resource = RNS.Resource(payload, link, advertise=True, auto_compress=False) + + with self._state_lock: + self._active_resources.setdefault(link, set()).add(resource) + + self._inc("resources_sent") + self._inc("resource_bytes_sent", size) + + self.log.info( + "Sent resource link_id=%s rid=%s kind=%s size=%s", + self._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + return True + + except Exception as e: + self.log.error( + "Failed to create resource link_id=%s: %s", + self._fmt_link_id(link), + e, + ) + return False + + def _send_text_smart( + self, + link: RNS.Link, + *, + msg_type: int, + text: str, + room: str | None = None, + encoding: str = "utf-8", + outgoing: list[tuple[RNS.Link, bytes]] | None = None, + ) -> None: + """ + Send text message using best method (packet or resource). + Falls back to chunking if resource transfer fails or is disabled. + """ + if self.identity is None: + return + + # Try encoding as a single packet first + env = make_envelope(msg_type, src=self.identity.hash, room=room, body=text) + payload = encode(env) + + # If it fits, send normally + if self._packet_would_fit(link, payload): + if outgoing is None: + self._send(link, env) + else: + self._queue_env(outgoing, link, env) + return + + # Too large for packet - try resource if enabled and type is NOTICE + if ( + self.config.enable_resource_transfer + and msg_type == T_NOTICE + and len(text.encode(encoding)) <= self.config.max_resource_bytes + ): + text_bytes = text.encode(encoding) + if self._send_via_resource( + link, + kind=RES_KIND_NOTICE, + payload=text_bytes, + room=room, + encoding=encoding, + ): + self.log.debug( + "Sent large text via resource link_id=%s chars=%s", + self._fmt_link_id(link), + len(text), + ) + return + + # Fall back to chunking for NOTICE + if msg_type == T_NOTICE: + if outgoing is None: + outgoing = [] + self._queue_notice_chunks(outgoing, link, room=room, text=text) + for out_link, chunk_payload in outgoing: + self._inc("bytes_out", len(chunk_payload)) + try: + RNS.Packet(out_link, chunk_payload).send() + except Exception as e: + self.log.warning( + "Failed to send chunk link_id=%s: %s", + self._fmt_link_id(out_link), + e, + ) + else: + self._queue_notice_chunks(outgoing, link, room=room, text=text) + else: + # For other message types, just drop or log error + self.log.error( + "Message too large and not NOTICE link_id=%s type=%s", + self._fmt_link_id(link), + msg_type, + ) + def start(self) -> None: self.log.info("Starting Reticulum") if self._started_wall_time is None: @@ -346,6 +839,8 @@ class HubService: self.sessions.clear() self.rooms.clear() self._rate.clear() + self._resource_expectations.clear() + self._active_resources.clear() for link in links: try: @@ -1440,6 +1935,15 @@ class HubService: c.get("pongs_out", 0), ) ) + lines.append( + "resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format( + c.get("resources_sent", 0), + c.get("resources_received", 0), + c.get("resources_rejected", 0), + c.get("resource_bytes_sent", 0), + c.get("resource_bytes_received", 0), + ) + ) return "\n".join(lines) @@ -2396,6 +2900,10 @@ class HubService: tokens=float(self.config.rate_limit_msgs_per_minute), last_refill=time.monotonic(), ) + + # Initialize resource tracking for this link + self._resource_expectations[link] = {} + self._active_resources[link] = set() link.set_packet_callback(lambda data, pkt: self._on_packet(link, data)) link.set_link_closed_callback(lambda closed_link: self._on_close(closed_link)) @@ -2404,6 +2912,23 @@ class HubService: identified_link, ident ) ) + + # Set up resource callbacks + if self.config.enable_resource_transfer: + try: + link.set_resource_strategy(RNS.Link.ACCEPT_APP) + link.set_resource_callback(self._resource_advertised) + link.set_resource_concluded_callback(self._resource_concluded) + self.log.debug( + "Resource callbacks configured link_id=%s", + self._fmt_link_id(link), + ) + except Exception as e: + self.log.warning( + "Failed to set resource callbacks link_id=%s: %s", + self._fmt_link_id(link), + e, + ) self.log.info("Link established link_id=%s", self._fmt_link_id(link)) @@ -2490,6 +3015,11 @@ class HubService: with self._state_lock: sess = self.sessions.pop(link, None) self._rate.pop(link, None) + + # Clean up resource state + self._resource_expectations.pop(link, None) + self._active_resources.pop(link, None) + if not sess: return @@ -2675,6 +3205,117 @@ class HubService: sess["awaiting_pong"] = None return + if t == T_RESOURCE_ENVELOPE: + # Handle resource envelope announcement + if not self.config.enable_resource_transfer: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource transfer disabled", + room=room, + ) + return + + if not isinstance(body, dict): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="invalid resource envelope body", + room=room, + ) + return + + rid = body.get(B_RES_ID) + kind = body.get(B_RES_KIND) + size = body.get(B_RES_SIZE) + sha256 = body.get(B_RES_SHA256) + encoding = body.get(B_RES_ENCODING) + + # Validate required fields + if not isinstance(rid, (bytes, bytearray)): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope missing id", + room=room, + ) + return + + if not isinstance(kind, str) or not kind: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope missing kind", + room=room, + ) + return + + if not isinstance(size, int) or size < 0: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope invalid size", + room=room, + ) + return + + # Check size limit + if size > self.config.max_resource_bytes: + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text=f"resource too large: {size} > {self.config.max_resource_bytes}", + room=room, + ) + return + + # Validate optional fields + if sha256 is not None and not isinstance(sha256, (bytes, bytearray)): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="resource envelope invalid sha256", + room=room, + ) + return + + if encoding is not None and not isinstance(encoding, str): + encoding = None + + # Add expectation + if not self._add_resource_expectation( + link, + rid=bytes(rid), + kind=kind, + size=size, + sha256=bytes(sha256) if sha256 else None, + encoding=encoding, + room=room, + ): + if self.identity is not None: + self._emit_error( + outgoing, + link, + src=self.identity.hash, + text="too many pending resource expectations", + room=room, + ) + return + if not sess["welcomed"]: if t != T_HELLO: if self.identity is not None: diff --git a/tests/test_resource.py b/tests/test_resource.py new file mode 100644 index 0000000..838d439 --- /dev/null +++ b/tests/test_resource.py @@ -0,0 +1,97 @@ +"""Tests for resource transfer functionality.""" +import hashlib +import os + +from rrcd.codec import decode, encode +from rrcd.constants import ( + B_RES_ENCODING, + B_RES_ID, + B_RES_KIND, + B_RES_SHA256, + B_RES_SIZE, + K_BODY, + K_SRC, + K_T, + RES_KIND_NOTICE, + T_RESOURCE_ENVELOPE, +) +from rrcd.envelope import make_envelope + + +def test_resource_envelope_serialization(): + """Test that resource envelopes can be created and serialized.""" + src = os.urandom(16) + rid = os.urandom(8) + payload = b"This is a test payload that is larger than typical MDU" + sha256 = hashlib.sha256(payload).digest() + + body = { + B_RES_ID: rid, + B_RES_KIND: RES_KIND_NOTICE, + B_RES_SIZE: len(payload), + B_RES_SHA256: sha256, + B_RES_ENCODING: "utf-8", + } + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=src, + room="test", + body=body, + ) + + # Serialize and deserialize + encoded = encode(envelope) + decoded = decode(encoded) + + assert decoded[K_T] == T_RESOURCE_ENVELOPE + assert decoded[K_SRC] == src + + decoded_body = decoded[K_BODY] + assert decoded_body[B_RES_ID] == rid + assert decoded_body[B_RES_KIND] == RES_KIND_NOTICE + assert decoded_body[B_RES_SIZE] == len(payload) + assert decoded_body[B_RES_SHA256] == sha256 + assert decoded_body[B_RES_ENCODING] == "utf-8" + + +def test_resource_envelope_minimal(): + """Test resource envelope with minimal required fields.""" + src = os.urandom(16) + rid = os.urandom(8) + + body = { + B_RES_ID: rid, + B_RES_KIND: "blob", + B_RES_SIZE: 1024, + } + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=src, + body=body, + ) + + encoded = encode(envelope) + decoded = decode(encoded) + + decoded_body = decoded[K_BODY] + assert B_RES_SHA256 not in decoded_body + assert B_RES_ENCODING not in decoded_body + assert decoded_body[B_RES_SIZE] == 1024 + + +def test_sha256_verification(): + """Test SHA256 hash computation for payload verification.""" + payload = b"Test payload for SHA256 verification" + expected = hashlib.sha256(payload).digest() + + # Verify we can compute and compare hashes correctly + computed = hashlib.sha256(payload).digest() + assert computed == expected + assert len(computed) == 32 + + # Verify mismatch detection + wrong_payload = b"Different payload" + wrong_hash = hashlib.sha256(wrong_payload).digest() + assert wrong_hash != expected