implement large payload transfer via RNS.Resource with resource envelope handling

This commit is contained in:
kc1awv
2025-12-31 17:00:56 -05:00
parent 0bba49b0a4
commit 311a5ba594
5 changed files with 791 additions and 1 deletions
+34
View File
@@ -139,6 +139,40 @@ Wire-level extensions (backwards-compatible):
UTF-8 encodable, contain no newlines/NUL, and are at most `nick_max_chars`
characters (default: 32).
- **Large payload transfer via RNS.Resource**: For messages that exceed the link
MTU (Maximum Data Unit), `rrcd` can automatically use RNS.Resource for
reliable large payload transfer instead of manual chunking.
This is implemented as a two-part protocol:
1. Send a small `RESOURCE_ENVELOPE` message (type 50) via normal packet,
announcing the incoming resource with metadata (id, kind, size, SHA256).
2. Send the actual payload via `RNS.Resource`.
The receiving side matches the resource to the expectation and validates
integrity. Supported resource kinds include:
- `notice`: Large NOTICE text messages
- `motd`: Message of the day / server greeting
- `blob`: Generic binary data
Configuration (in `rrcd.toml`):
```toml
[hub]
enable_resource_transfer = true # default: true
max_resource_bytes = 262144 # 256 KiB default
max_pending_resource_expectations = 8 # per link
resource_expectation_ttl_s = 30.0 # expectation timeout
```
Safety controls:
- Resources are only accepted if they match a recent expectation
- Size limits enforced (default 256 KiB)
- SHA256 verification for integrity
- TTL-based expectation expiry (default 30 seconds)
- Per-link expectation limit to prevent memory exhaustion
Fallback: If resource transfer is disabled or fails, NOTICE messages fall
back to the original line-based chunking method.
Configure trusted operators and banned identities in the TOML config:
- `trusted_identities`: list of Reticulum Identity hashes (hex) allowed to run
+4
View File
@@ -26,6 +26,10 @@ class HubRuntimeConfig:
rate_limit_msgs_per_minute: int = 240
ping_interval_s: float = 0.0
ping_timeout_s: float = 0.0
max_resource_bytes: int = 256 * 1024 # 256 KiB default
max_pending_resource_expectations: int = 8
resource_expectation_ttl_s: float = 30.0
enable_resource_transfer: bool = True
log_level: str = "INFO"
log_rns_level: str = "WARNING"
log_console: bool = True
+14
View File
@@ -29,6 +29,8 @@ T_PONG = 31
T_ERROR = 40
T_RESOURCE_ENVELOPE = 50
# HELLO body keys
# Per spec: key assignments are fixed.
B_HELLO_NAME = 0
@@ -46,3 +48,15 @@ B_WELCOME_CAPS = 2
# Capabilities map keys (values are advisory). Keep these small and numeric.
CAP_RESOURCE_ENVELOPE = 0
# RESOURCE_ENVELOPE body keys
B_RES_ID = 0
B_RES_KIND = 1
B_RES_SIZE = 2
B_RES_SHA256 = 3
B_RES_ENCODING = 4
# Resource kinds (string values)
RES_KIND_NOTICE = "notice"
RES_KIND_MOTD = "motd"
RES_KIND_BLOB = "blob"
+642 -1
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import hashlib
import logging
import os
import signal
@@ -16,6 +17,11 @@ from .config import HubRuntimeConfig
from .constants import (
B_HELLO_CAPS,
B_HELLO_NICK_LEGACY,
B_RES_ENCODING,
B_RES_ID,
B_RES_KIND,
B_RES_SHA256,
B_RES_SIZE,
B_WELCOME_HUB,
B_WELCOME_VER,
K_BODY,
@@ -23,6 +29,9 @@ from .constants import (
K_ROOM,
K_SRC,
K_T,
RES_KIND_BLOB,
RES_KIND_MOTD,
RES_KIND_NOTICE,
T_ERROR,
T_HELLO,
T_JOIN,
@@ -33,6 +42,7 @@ from .constants import (
T_PARTED,
T_PING,
T_PONG,
T_RESOURCE_ENVELOPE,
T_WELCOME,
)
from .envelope import make_envelope, validate_envelope
@@ -46,6 +56,19 @@ class _RateState:
last_refill: float
@dataclass
class _ResourceExpectation:
"""Tracks an expected incoming Resource transfer."""
id: bytes
kind: str
size: int
sha256: bytes | None
encoding: str | None
created_at: float
expires_at: float
room: str | None = None
class HubService:
def __init__(self, config: HubRuntimeConfig) -> None:
self.config = config
@@ -65,6 +88,10 @@ class HubService:
self.sessions: dict[RNS.Link, dict[str, Any]] = {}
self._rate: dict[RNS.Link, _RateState] = {}
# Resource transfer state
self._resource_expectations: dict[RNS.Link, dict[bytes, _ResourceExpectation]] = {}
self._active_resources: dict[RNS.Link, set[RNS.Resource]] = {}
self._trusted: set[bytes] = set()
self._banned: set[bytes] = set()
@@ -100,6 +127,11 @@ class HubService:
"pings_out": 0,
"pongs_out": 0,
"announces": 0,
"resources_sent": 0,
"resources_received": 0,
"resources_rejected": 0,
"resource_bytes_sent": 0,
"resource_bytes_received": 0,
}
def _extract_caps(self, body: Any) -> dict[int, Any]:
@@ -218,7 +250,9 @@ class HubService:
# The hub greeting is delivered as NOTICE after WELCOME.
if g:
self._queue_notice_chunks(outgoing, link, room=None, text=g)
self._send_text_smart(
link, msg_type=T_NOTICE, text=g, room=None, outgoing=outgoing
)
def _inc(self, key: str, delta: int = 1) -> None:
try:
@@ -227,6 +261,465 @@ class HubService:
except Exception:
pass
# Resource transfer methods
def _cleanup_expired_expectations(self, link: RNS.Link) -> None:
"""Remove expired resource expectations for a link."""
now = time.time()
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return
expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now]
for rid in expired:
exp_dict.pop(rid, None)
self.log.debug(
"Expired resource expectation link_id=%s rid=%s",
self._fmt_link_id(link),
rid.hex() if isinstance(rid, bytes) else rid,
)
def _add_resource_expectation(
self,
link: RNS.Link,
*,
rid: bytes,
kind: str,
size: int,
sha256: bytes | None = None,
encoding: str | None = None,
room: str | None = None,
) -> bool:
"""Add a resource expectation. Returns False if limit exceeded."""
self._cleanup_expired_expectations(link)
exp_dict = self._resource_expectations.setdefault(link, {})
if len(exp_dict) >= self.config.max_pending_resource_expectations:
self.log.warning(
"Max pending expectations exceeded link_id=%s",
self._fmt_link_id(link),
)
return False
now = time.time()
exp = _ResourceExpectation(
id=rid,
kind=kind,
size=size,
sha256=sha256,
encoding=encoding,
created_at=now,
expires_at=now + self.config.resource_expectation_ttl_s,
room=room,
)
exp_dict[rid] = exp
self.log.debug(
"Added resource expectation link_id=%s rid=%s kind=%s size=%s",
self._fmt_link_id(link),
rid.hex(),
kind,
size,
)
return True
def _find_resource_expectation(
self, link: RNS.Link, size: int
) -> _ResourceExpectation | None:
"""Find a matching resource expectation by size (fallback matching)."""
self._cleanup_expired_expectations(link)
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return None
# Match by size (first match wins)
for exp in exp_dict.values():
if exp.size == size:
return exp
return None
def _pop_resource_expectation(
self, link: RNS.Link, rid: bytes
) -> _ResourceExpectation | None:
"""Remove and return a resource expectation."""
exp_dict = self._resource_expectations.get(link)
if not exp_dict:
return None
return exp_dict.pop(rid, None)
def _resource_advertised(self, resource: RNS.Resource) -> bool:
"""
Callback when a Resource is advertised by remote peer.
Returns True to accept, False to reject.
"""
link = resource.link
if not self.config.enable_resource_transfer:
self.log.debug(
"Rejecting resource (disabled) link_id=%s",
self._fmt_link_id(link),
)
self._inc("resources_rejected")
return False
with self._state_lock:
sess = self.sessions.get(link)
if not sess:
self.log.debug(
"Rejecting resource (no session) link_id=%s",
self._fmt_link_id(link),
)
self._inc("resources_rejected")
return False
# Check size limit
size = resource.total_size if hasattr(resource, "total_size") else resource.size
if size > self.config.max_resource_bytes:
self.log.warning(
"Rejecting resource (too large: %s > %s) link_id=%s",
size,
self.config.max_resource_bytes,
self._fmt_link_id(link),
)
self._inc("resources_rejected")
return False
# Check for matching expectation
exp = self._find_resource_expectation(link, size)
if not exp:
self.log.warning(
"Rejecting resource (no matching expectation) link_id=%s size=%s",
self._fmt_link_id(link),
size,
)
self._inc("resources_rejected")
return False
# Accept
self.log.info(
"Accepting resource link_id=%s size=%s kind=%s",
self._fmt_link_id(link),
size,
exp.kind,
)
self._active_resources.setdefault(link, set()).add(resource)
return True
def _resource_concluded(self, resource: RNS.Resource) -> None:
"""Callback when a Resource transfer completes."""
link = resource.link
with self._state_lock:
# Remove from active set
active_set = self._active_resources.get(link)
if active_set:
active_set.discard(resource)
if resource.status != RNS.Resource.COMPLETE:
self.log.warning(
"Resource transfer failed link_id=%s status=%s",
self._fmt_link_id(link),
resource.status,
)
return
# Get payload
try:
payload = resource.data.read() if hasattr(resource.data, "read") else resource.data
if isinstance(payload, bytearray):
payload = bytes(payload)
except Exception as e:
self.log.error(
"Failed to read resource data link_id=%s: %s",
self._fmt_link_id(link),
e,
)
return
size = len(payload)
# Find and remove expectation
exp = self._find_resource_expectation(link, size)
if not exp:
self.log.warning(
"Received resource without expectation link_id=%s size=%s",
self._fmt_link_id(link),
size,
)
return
self._pop_resource_expectation(link, exp.id)
# Verify SHA256 if provided
if exp.sha256:
actual_hash = hashlib.sha256(payload).digest()
if actual_hash != exp.sha256:
self.log.error(
"Resource SHA256 mismatch link_id=%s expected=%s actual=%s",
self._fmt_link_id(link),
exp.sha256.hex(),
actual_hash.hex(),
)
return
self._inc("resources_received")
self._inc("resource_bytes_received", size)
self.log.info(
"Resource received link_id=%s size=%s kind=%s",
self._fmt_link_id(link),
size,
exp.kind,
)
# Dispatch by kind
try:
self._dispatch_received_resource(link, exp, payload)
except Exception as e:
self.log.exception(
"Failed to dispatch resource link_id=%s kind=%s: %s",
self._fmt_link_id(link),
exp.kind,
e,
)
def _dispatch_received_resource(
self, link: RNS.Link, exp: _ResourceExpectation, payload: bytes
) -> None:
"""Dispatch a received resource payload to appropriate handler."""
if exp.kind == RES_KIND_NOTICE:
# Decode as text and deliver as notice
encoding = exp.encoding or "utf-8"
try:
text = payload.decode(encoding)
except Exception as e:
self.log.error(
"Failed to decode notice resource link_id=%s encoding=%s: %s",
self._fmt_link_id(link),
encoding,
e,
)
return
# Log the notice (don't send back to sender)
self.log.info(
"Received large NOTICE via resource link_id=%s room=%r chars=%s",
self._fmt_link_id(link),
exp.room,
len(text),
)
# Note: In a full implementation, this would be forwarded to other room members
# For now, just acknowledge receipt
elif exp.kind == RES_KIND_MOTD:
# Similar to NOTICE
encoding = exp.encoding or "utf-8"
try:
text = payload.decode(encoding)
except Exception as e:
self.log.error(
"Failed to decode MOTD resource link_id=%s: %s",
self._fmt_link_id(link),
e,
)
return
self.log.info(
"Received MOTD via resource link_id=%s chars=%s",
self._fmt_link_id(link),
len(text),
)
elif exp.kind == RES_KIND_BLOB:
# Generic binary data
self.log.info(
"Received BLOB via resource link_id=%s bytes=%s",
self._fmt_link_id(link),
len(payload),
)
else:
self.log.warning(
"Unknown resource kind link_id=%s kind=%s",
self._fmt_link_id(link),
exp.kind,
)
def _send_via_resource(
self,
link: RNS.Link,
*,
kind: str,
payload: bytes,
room: str | None = None,
encoding: str | None = None,
) -> bool:
"""
Send large payload via Resource.
Returns True if successfully initiated, False otherwise.
"""
if not self.config.enable_resource_transfer:
return False
size = len(payload)
if size > self.config.max_resource_bytes:
self.log.error(
"Payload too large for resource transfer: %s > %s",
size,
self.config.max_resource_bytes,
)
return False
# Generate resource ID
rid = os.urandom(8)
# Compute SHA256
sha256 = hashlib.sha256(payload).digest()
# Send envelope first
if self.identity is None:
return False
envelope_body = {
B_RES_ID: rid,
B_RES_KIND: kind,
B_RES_SIZE: size,
B_RES_SHA256: sha256,
}
if encoding:
envelope_body[B_RES_ENCODING] = encoding
envelope = make_envelope(
T_RESOURCE_ENVELOPE,
src=self.identity.hash,
room=room,
body=envelope_body,
)
try:
envelope_payload = encode(envelope)
RNS.Packet(link, envelope_payload).send()
self._inc("bytes_out", len(envelope_payload))
self.log.debug(
"Sent resource envelope link_id=%s rid=%s kind=%s size=%s",
self._fmt_link_id(link),
rid.hex(),
kind,
size,
)
except Exception as e:
self.log.error(
"Failed to send resource envelope link_id=%s: %s",
self._fmt_link_id(link),
e,
)
return False
# Create and advertise resource
try:
resource = RNS.Resource(payload, link, advertise=True, auto_compress=False)
with self._state_lock:
self._active_resources.setdefault(link, set()).add(resource)
self._inc("resources_sent")
self._inc("resource_bytes_sent", size)
self.log.info(
"Sent resource link_id=%s rid=%s kind=%s size=%s",
self._fmt_link_id(link),
rid.hex(),
kind,
size,
)
return True
except Exception as e:
self.log.error(
"Failed to create resource link_id=%s: %s",
self._fmt_link_id(link),
e,
)
return False
def _send_text_smart(
self,
link: RNS.Link,
*,
msg_type: int,
text: str,
room: str | None = None,
encoding: str = "utf-8",
outgoing: list[tuple[RNS.Link, bytes]] | None = None,
) -> None:
"""
Send text message using best method (packet or resource).
Falls back to chunking if resource transfer fails or is disabled.
"""
if self.identity is None:
return
# Try encoding as a single packet first
env = make_envelope(msg_type, src=self.identity.hash, room=room, body=text)
payload = encode(env)
# If it fits, send normally
if self._packet_would_fit(link, payload):
if outgoing is None:
self._send(link, env)
else:
self._queue_env(outgoing, link, env)
return
# Too large for packet - try resource if enabled and type is NOTICE
if (
self.config.enable_resource_transfer
and msg_type == T_NOTICE
and len(text.encode(encoding)) <= self.config.max_resource_bytes
):
text_bytes = text.encode(encoding)
if self._send_via_resource(
link,
kind=RES_KIND_NOTICE,
payload=text_bytes,
room=room,
encoding=encoding,
):
self.log.debug(
"Sent large text via resource link_id=%s chars=%s",
self._fmt_link_id(link),
len(text),
)
return
# Fall back to chunking for NOTICE
if msg_type == T_NOTICE:
if outgoing is None:
outgoing = []
self._queue_notice_chunks(outgoing, link, room=room, text=text)
for out_link, chunk_payload in outgoing:
self._inc("bytes_out", len(chunk_payload))
try:
RNS.Packet(out_link, chunk_payload).send()
except Exception as e:
self.log.warning(
"Failed to send chunk link_id=%s: %s",
self._fmt_link_id(out_link),
e,
)
else:
self._queue_notice_chunks(outgoing, link, room=room, text=text)
else:
# For other message types, just drop or log error
self.log.error(
"Message too large and not NOTICE link_id=%s type=%s",
self._fmt_link_id(link),
msg_type,
)
def start(self) -> None:
self.log.info("Starting Reticulum")
if self._started_wall_time is None:
@@ -346,6 +839,8 @@ class HubService:
self.sessions.clear()
self.rooms.clear()
self._rate.clear()
self._resource_expectations.clear()
self._active_resources.clear()
for link in links:
try:
@@ -1440,6 +1935,15 @@ class HubService:
c.get("pongs_out", 0),
)
)
lines.append(
"resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format(
c.get("resources_sent", 0),
c.get("resources_received", 0),
c.get("resources_rejected", 0),
c.get("resource_bytes_sent", 0),
c.get("resource_bytes_received", 0),
)
)
return "\n".join(lines)
@@ -2396,6 +2900,10 @@ class HubService:
tokens=float(self.config.rate_limit_msgs_per_minute),
last_refill=time.monotonic(),
)
# Initialize resource tracking for this link
self._resource_expectations[link] = {}
self._active_resources[link] = set()
link.set_packet_callback(lambda data, pkt: self._on_packet(link, data))
link.set_link_closed_callback(lambda closed_link: self._on_close(closed_link))
@@ -2404,6 +2912,23 @@ class HubService:
identified_link, ident
)
)
# Set up resource callbacks
if self.config.enable_resource_transfer:
try:
link.set_resource_strategy(RNS.Link.ACCEPT_APP)
link.set_resource_callback(self._resource_advertised)
link.set_resource_concluded_callback(self._resource_concluded)
self.log.debug(
"Resource callbacks configured link_id=%s",
self._fmt_link_id(link),
)
except Exception as e:
self.log.warning(
"Failed to set resource callbacks link_id=%s: %s",
self._fmt_link_id(link),
e,
)
self.log.info("Link established link_id=%s", self._fmt_link_id(link))
@@ -2490,6 +3015,11 @@ class HubService:
with self._state_lock:
sess = self.sessions.pop(link, None)
self._rate.pop(link, None)
# Clean up resource state
self._resource_expectations.pop(link, None)
self._active_resources.pop(link, None)
if not sess:
return
@@ -2675,6 +3205,117 @@ class HubService:
sess["awaiting_pong"] = None
return
if t == T_RESOURCE_ENVELOPE:
# Handle resource envelope announcement
if not self.config.enable_resource_transfer:
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="resource transfer disabled",
room=room,
)
return
if not isinstance(body, dict):
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="invalid resource envelope body",
room=room,
)
return
rid = body.get(B_RES_ID)
kind = body.get(B_RES_KIND)
size = body.get(B_RES_SIZE)
sha256 = body.get(B_RES_SHA256)
encoding = body.get(B_RES_ENCODING)
# Validate required fields
if not isinstance(rid, (bytes, bytearray)):
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="resource envelope missing id",
room=room,
)
return
if not isinstance(kind, str) or not kind:
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="resource envelope missing kind",
room=room,
)
return
if not isinstance(size, int) or size < 0:
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="resource envelope invalid size",
room=room,
)
return
# Check size limit
if size > self.config.max_resource_bytes:
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text=f"resource too large: {size} > {self.config.max_resource_bytes}",
room=room,
)
return
# Validate optional fields
if sha256 is not None and not isinstance(sha256, (bytes, bytearray)):
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="resource envelope invalid sha256",
room=room,
)
return
if encoding is not None and not isinstance(encoding, str):
encoding = None
# Add expectation
if not self._add_resource_expectation(
link,
rid=bytes(rid),
kind=kind,
size=size,
sha256=bytes(sha256) if sha256 else None,
encoding=encoding,
room=room,
):
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="too many pending resource expectations",
room=room,
)
return
if not sess["welcomed"]:
if t != T_HELLO:
if self.identity is not None:
+97
View File
@@ -0,0 +1,97 @@
"""Tests for resource transfer functionality."""
import hashlib
import os
from rrcd.codec import decode, encode
from rrcd.constants import (
B_RES_ENCODING,
B_RES_ID,
B_RES_KIND,
B_RES_SHA256,
B_RES_SIZE,
K_BODY,
K_SRC,
K_T,
RES_KIND_NOTICE,
T_RESOURCE_ENVELOPE,
)
from rrcd.envelope import make_envelope
def test_resource_envelope_serialization():
"""Test that resource envelopes can be created and serialized."""
src = os.urandom(16)
rid = os.urandom(8)
payload = b"This is a test payload that is larger than typical MDU"
sha256 = hashlib.sha256(payload).digest()
body = {
B_RES_ID: rid,
B_RES_KIND: RES_KIND_NOTICE,
B_RES_SIZE: len(payload),
B_RES_SHA256: sha256,
B_RES_ENCODING: "utf-8",
}
envelope = make_envelope(
T_RESOURCE_ENVELOPE,
src=src,
room="test",
body=body,
)
# Serialize and deserialize
encoded = encode(envelope)
decoded = decode(encoded)
assert decoded[K_T] == T_RESOURCE_ENVELOPE
assert decoded[K_SRC] == src
decoded_body = decoded[K_BODY]
assert decoded_body[B_RES_ID] == rid
assert decoded_body[B_RES_KIND] == RES_KIND_NOTICE
assert decoded_body[B_RES_SIZE] == len(payload)
assert decoded_body[B_RES_SHA256] == sha256
assert decoded_body[B_RES_ENCODING] == "utf-8"
def test_resource_envelope_minimal():
"""Test resource envelope with minimal required fields."""
src = os.urandom(16)
rid = os.urandom(8)
body = {
B_RES_ID: rid,
B_RES_KIND: "blob",
B_RES_SIZE: 1024,
}
envelope = make_envelope(
T_RESOURCE_ENVELOPE,
src=src,
body=body,
)
encoded = encode(envelope)
decoded = decode(encoded)
decoded_body = decoded[K_BODY]
assert B_RES_SHA256 not in decoded_body
assert B_RES_ENCODING not in decoded_body
assert decoded_body[B_RES_SIZE] == 1024
def test_sha256_verification():
"""Test SHA256 hash computation for payload verification."""
payload = b"Test payload for SHA256 verification"
expected = hashlib.sha256(payload).digest()
# Verify we can compute and compare hashes correctly
computed = hashlib.sha256(payload).digest()
assert computed == expected
assert len(computed) == 32
# Verify mismatch detection
wrong_payload = b"Different payload"
wrong_hash = hashlib.sha256(wrong_payload).digest()
assert wrong_hash != expected