From 36e06ebfe844fcfb1e4fb92adf43cdaeb72fa027 Mon Sep 17 00:00:00 2001 From: kc1awv Date: Wed, 7 Jan 2026 08:31:25 -0500 Subject: [PATCH] refactor resource management --- rrcd/resources.py | 581 ++++++++++++++++++++++++++++++++++++++++++++++ rrcd/router.py | 2 +- rrcd/service.py | 552 +------------------------------------------ 3 files changed, 593 insertions(+), 542 deletions(-) create mode 100644 rrcd/resources.py diff --git a/rrcd/resources.py b/rrcd/resources.py new file mode 100644 index 0000000..57bfd7e --- /dev/null +++ b/rrcd/resources.py @@ -0,0 +1,581 @@ +"""Resource transfer management for RRCD.""" + +from __future__ import annotations + +import hashlib +import os +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import RNS + +from rrcd.codec import encode +from rrcd.constants import ( + B_RES_ENCODING, + B_RES_ID, + B_RES_KIND, + B_RES_SHA256, + B_RES_SIZE, + RES_KIND_BLOB, + RES_KIND_MOTD, + RES_KIND_NOTICE, + T_NOTICE, + T_RESOURCE_ENVELOPE, +) +from rrcd.envelope import make_envelope + +if TYPE_CHECKING: + from rrcd.service import HubService + + +@dataclass +class _ResourceExpectation: + """Tracks an expected incoming Resource transfer.""" + id: bytes + kind: str + size: int + sha256: bytes | None + encoding: str | None + created_at: float + expires_at: float + room: str | None = None + + +class ResourceManager: + """Manages RNS Resource transfers for the hub.""" + + def __init__(self, hub: HubService) -> None: + self.hub = hub + self.log = hub.log + + # Resource state + self._resource_expectations: dict[RNS.Link, dict[bytes, _ResourceExpectation]] = {} + self._active_resources: dict[RNS.Link, set[RNS.Resource]] = {} + # Tracks which expectation RID was matched to an advertised Resource. + self._resource_bindings: dict[RNS.Resource, bytes] = {} + + def on_link_established(self, link: RNS.Link) -> None: + """Initialize resource tracking for a new link.""" + self._resource_expectations[link] = {} + self._active_resources[link] = set() + + def on_link_closed(self, link: RNS.Link) -> None: + """Clean up resource state when a link closes.""" + self._resource_expectations.pop(link, None) + self._active_resources.pop(link, None) + + def clear_all(self) -> None: + """Clear all resource state (called during shutdown).""" + self._resource_expectations.clear() + self._active_resources.clear() + + def configure_link_callbacks(self, link: RNS.Link) -> None: + """Set up resource callbacks for a link if resource transfer is enabled.""" + if not self.hub.config.enable_resource_transfer: + return + + try: + link.set_resource_strategy(RNS.Link.ACCEPT_APP) + link.set_resource_callback(self._resource_advertised) + link.set_resource_concluded_callback(self._resource_concluded) + self.log.debug( + "Resource callbacks configured link_id=%s", + self.hub._fmt_link_id(link), + ) + except Exception as e: + self.log.warning( + "Failed to set resource callbacks link_id=%s: %s", + self.hub._fmt_link_id(link), + e, + ) + + # Resource expectation management + + def cleanup_expired_expectations(self, link: RNS.Link) -> None: + """Remove expired resource expectations for a link.""" + now = time.time() + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return + + expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] + for rid in expired: + exp_dict.pop(rid, None) + self.log.debug( + "Expired resource expectation link_id=%s rid=%s", + self.hub._fmt_link_id(link), + rid.hex() if isinstance(rid, bytes) else rid, + ) + + def cleanup_all_expired_expectations(self) -> None: + """Cleanup expired resource expectations across all links.""" + now = time.time() + with self.hub._state_lock: + for link, exp_dict in list(self._resource_expectations.items()): + if not exp_dict: + continue + + expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] + for rid in expired: + exp_dict.pop(rid, None) + self.log.debug( + "Expired resource expectation link_id=%s rid=%s", + self.hub._fmt_link_id(link), + rid.hex() if isinstance(rid, bytes) else rid, + ) + + def add_resource_expectation( + self, + link: RNS.Link, + *, + rid: bytes, + kind: str, + size: int, + sha256: bytes | None = None, + encoding: str | None = None, + room: str | None = None, + ) -> bool: + """Add a resource expectation. Returns False if limit exceeded.""" + self.cleanup_expired_expectations(link) + + exp_dict = self._resource_expectations.setdefault(link, {}) + + if len(exp_dict) >= self.hub.config.max_pending_resource_expectations: + self.log.warning( + "Max pending expectations exceeded link_id=%s", + self.hub._fmt_link_id(link), + ) + return False + + now = time.time() + exp = _ResourceExpectation( + id=rid, + kind=kind, + size=size, + sha256=sha256, + encoding=encoding, + created_at=now, + expires_at=now + self.hub.config.resource_expectation_ttl_s, + room=room, + ) + exp_dict[rid] = exp + + self.log.debug( + "Added resource expectation link_id=%s rid=%s kind=%s size=%s", + self.hub._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + return True + + def find_resource_expectation( + self, link: RNS.Link, size: int + ) -> _ResourceExpectation | None: + """Find a matching resource expectation by size (fallback matching).""" + self.cleanup_expired_expectations(link) + + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + + # Match by size (first match wins) + for exp in exp_dict.values(): + if exp.size == size: + return exp + + return None + + def get_resource_expectation_by_rid( + self, link: RNS.Link, rid: bytes + ) -> _ResourceExpectation | None: + """Lookup an expectation by RID without removing it.""" + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + return exp_dict.get(rid) + + def match_resource_expectation( + self, link: RNS.Link, *, rid: bytes | None, size: int, sha256: bytes | None + ) -> _ResourceExpectation | None: + """Find the expectation that should satisfy a completed resource. + + Preference order: + 1) Bound RID (from advertisement) when available. + 2) Exact RID lookup. + 3) Fallback: first size match whose sha256 (if present) matches. + """ + self.cleanup_expired_expectations(link) + + if rid is not None: + exp = self.get_resource_expectation_by_rid(link, rid) + if exp is not None: + return exp + + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + + # Avoid linear scan if nothing matches by size. + for exp in exp_dict.values(): + if exp.size != size: + continue + if exp.sha256 and sha256 and exp.sha256 != sha256: + continue + return exp + return None + + def pop_resource_expectation( + self, link: RNS.Link, rid: bytes + ) -> _ResourceExpectation | None: + """Remove and return a resource expectation.""" + exp_dict = self._resource_expectations.get(link) + if not exp_dict: + return None + return exp_dict.pop(rid, None) + + # Resource transfer callbacks + + def _resource_advertised(self, resource: RNS.Resource) -> bool: + """ + Callback when a Resource is advertised by remote peer. + Returns True to accept, False to reject. + + Minimize lock scope to prevent potential deadlocks with RNS internal locks. + """ + link = resource.link + + # Check config outside lock (immutable during runtime) + if not self.hub.config.enable_resource_transfer: + self.log.debug( + "Rejecting resource (disabled) link_id=%s", + self.hub._fmt_link_id(link), + ) + self.hub._inc("resources_rejected") + return False + + # Check size limit (immutable config) + size = resource.total_size if hasattr(resource, "total_size") else resource.size + if size > self.hub.config.max_resource_bytes: + self.log.warning( + "Rejecting resource (too large: %s > %s) link_id=%s", + size, + self.hub.config.max_resource_bytes, + self.hub._fmt_link_id(link), + ) + self.hub._inc("resources_rejected") + return False + + # Check session exists and find expectation with minimal lock scope + with self.hub._state_lock: + sess = self.hub.session_manager.sessions.get(link) + if not sess: + self.log.debug( + "Rejecting resource (no session) link_id=%s", + self.hub._fmt_link_id(link), + ) + self.hub._inc("resources_rejected") + return False + + # Find matching expectation + exp = self.find_resource_expectation(link, size) + + # Check expectation outside lock + if not exp: + self.log.warning( + "Rejecting resource (no matching expectation) link_id=%s size=%s", + self.hub._fmt_link_id(link), + size, + ) + self.hub._inc("resources_rejected") + return False + + # Accept and register with minimal lock scope + self.log.info( + "Accepting resource link_id=%s size=%s kind=%s", + self.hub._fmt_link_id(link), + size, + exp.kind, + ) + + with self.hub._state_lock: + self._active_resources.setdefault(link, set()).add(resource) + # Remember which expectation RID this resource was matched to so the + # conclusion handler can verify and pop the correct entry. + self._resource_bindings[resource] = exp.id + + return True + + def _resource_concluded(self, resource: RNS.Resource) -> None: + """Callback when a Resource transfer completes.""" + link = resource.link + + with self.hub._state_lock: + # Remove from active set and retrieve any bound expectation RID. + active_set = self._active_resources.get(link) + if active_set: + active_set.discard(resource) + bound_rid = self._resource_bindings.pop(resource, None) + + if resource.status != RNS.Resource.COMPLETE: + self.log.warning( + "Resource transfer failed link_id=%s status=%s", + self.hub._fmt_link_id(link), + resource.status, + ) + return + + # Get payload outside the lock. + try: + payload = resource.data.read() if hasattr(resource.data, "read") else resource.data + if isinstance(payload, bytearray): + payload = bytes(payload) + except Exception as e: + self.log.error( + "Failed to read resource data link_id=%s: %s", + self.hub._fmt_link_id(link), + e, + ) + return + + size = len(payload) + actual_hash = hashlib.sha256(payload).digest() + + # Find expectation using bound RID first, then RID lookup, then size/sha fallback. + exp = self.match_resource_expectation(link, rid=bound_rid, size=size, sha256=actual_hash) + if not exp: + self.log.warning( + "Received resource without expectation link_id=%s size=%s", + self.hub._fmt_link_id(link), + size, + ) + return + + # Verify SHA256 if provided; keep expectation if mismatch so sender can retry. + if exp.sha256 and actual_hash != exp.sha256: + self.log.error( + "Resource SHA256 mismatch link_id=%s expected=%s actual=%s", + self.hub._fmt_link_id(link), + exp.sha256.hex(), + actual_hash.hex(), + ) + return + + # Pop expectation only after validation succeeds. + self.pop_resource_expectation(link, exp.id) + + self.hub._inc("resources_received") + self.hub._inc("resource_bytes_received", size) + + self.log.info( + "Resource received link_id=%s size=%s kind=%s", + self.hub._fmt_link_id(link), + size, + exp.kind, + ) + + # Dispatch by kind + try: + self._dispatch_received_resource(link, exp, payload) + except Exception as e: + self.log.exception( + "Failed to dispatch resource link_id=%s kind=%s: %s", + self.hub._fmt_link_id(link), + exp.kind, + e, + ) + + def _dispatch_received_resource( + self, link: RNS.Link, exp: _ResourceExpectation, payload: bytes + ) -> None: + """Dispatch a received resource payload to appropriate handler.""" + if exp.kind == RES_KIND_NOTICE: + # Decode as text and deliver as notice + encoding = exp.encoding or "utf-8" + try: + text = payload.decode(encoding) + except Exception as e: + self.log.error( + "Failed to decode notice resource link_id=%s encoding=%s: %s", + self.hub._fmt_link_id(link), + encoding, + e, + ) + return + + self.log.info( + "Received large NOTICE via resource link_id=%s room=%r chars=%s", + self.hub._fmt_link_id(link), + exp.room, + len(text), + ) + + # Forward NOTICE to room members if room is specified + if exp.room and self.hub.identity is not None: + with self.hub._state_lock: + sess = self.hub.session_manager.sessions.get(link) + peer_hash = sess.get("peer") if sess else None + room_members = self.hub.rooms.get(exp.room, set()) + + if peer_hash and room_members: + notice_env = make_envelope( + T_NOTICE, + src=peer_hash, + room=exp.room, + body=text, + ) + notice_payload = encode(notice_env) + + # Forward to all room members except sender + forwarded = 0 + for other in room_members: + if other != link: + try: + other.packet(notice_payload) + forwarded += 1 + except Exception as e: + self.log.warning( + "Failed to forward NOTICE resource link_id=%s: %s", + self.hub._fmt_link_id(other), + e, + ) + + if forwarded > 0: + self.hub._inc("notices_forwarded") + self.log.debug( + "Forwarded NOTICE resource to %d members room=%s", + forwarded, + exp.room, + ) + + elif exp.kind == RES_KIND_MOTD: + # Similar to NOTICE + encoding = exp.encoding or "utf-8" + try: + text = payload.decode(encoding) + except Exception as e: + self.log.error( + "Failed to decode MOTD resource link_id=%s: %s", + self.hub._fmt_link_id(link), + e, + ) + return + + self.log.info( + "Received MOTD via resource link_id=%s chars=%s", + self.hub._fmt_link_id(link), + len(text), + ) + + elif exp.kind == RES_KIND_BLOB: + # Generic binary data + self.log.info( + "Received BLOB via resource link_id=%s bytes=%s", + self.hub._fmt_link_id(link), + len(payload), + ) + else: + self.log.warning( + "Unknown resource kind link_id=%s kind=%s", + self.hub._fmt_link_id(link), + exp.kind, + ) + + def send_via_resource( + self, + link: RNS.Link, + *, + kind: str, + payload: bytes, + room: str | None = None, + encoding: str | None = None, + ) -> bool: + """ + Send large payload via Resource. + Returns True if successfully initiated, False otherwise. + """ + if not self.hub.config.enable_resource_transfer: + return False + + size = len(payload) + if size > self.hub.config.max_resource_bytes: + self.log.error( + "Payload too large for resource transfer: %s > %s", + size, + self.hub.config.max_resource_bytes, + ) + return False + + # Generate resource ID + rid = os.urandom(8) + + # Compute SHA256 + sha256 = hashlib.sha256(payload).digest() + + # Send envelope first + if self.hub.identity is None: + return False + + envelope_body = { + B_RES_ID: rid, + B_RES_KIND: kind, + B_RES_SIZE: size, + B_RES_SHA256: sha256, + } + if encoding: + envelope_body[B_RES_ENCODING] = encoding + + envelope = make_envelope( + T_RESOURCE_ENVELOPE, + src=self.hub.identity.hash, + room=room, + body=envelope_body, + ) + + try: + envelope_payload = encode(envelope) + RNS.Packet(link, envelope_payload).send() + self.hub._inc("bytes_out", len(envelope_payload)) + + self.log.debug( + "Sent resource envelope link_id=%s rid=%s kind=%s size=%s", + self.hub._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + except Exception as e: + self.log.error( + "Failed to send resource envelope link_id=%s: %s", + self.hub._fmt_link_id(link), + e, + ) + return False + + # Create and advertise resource + try: + resource = RNS.Resource(payload, link, advertise=True, auto_compress=False) + + with self.hub._state_lock: + self._active_resources.setdefault(link, set()).add(resource) + + self.hub._inc("resources_sent") + self.hub._inc("resource_bytes_sent", size) + + self.log.info( + "Sent resource link_id=%s rid=%s kind=%s size=%s", + self.hub._fmt_link_id(link), + rid.hex(), + kind, + size, + ) + return True + + except Exception as e: + self.log.error( + "Failed to create resource link_id=%s: %s", + self.hub._fmt_link_id(link), + e, + ) + return False diff --git a/rrcd/router.py b/rrcd/router.py index 46a29ff..7642591 100644 --- a/rrcd/router.py +++ b/rrcd/router.py @@ -258,7 +258,7 @@ class MessageRouter: encoding = None # Add expectation - if not self.hub._add_resource_expectation( + if not self.hub.resource_manager.add_resource_expectation( link, rid=bytes(rid), kind=kind, diff --git a/rrcd/service.py b/rrcd/service.py index 5f1c566..b9863bf 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -1,12 +1,11 @@ from __future__ import annotations -import hashlib import logging import os import signal import threading import time -from dataclasses import asdict, dataclass, replace +from dataclasses import asdict, replace from typing import Any import RNS @@ -16,42 +15,23 @@ from .codec import encode from .commands import CommandHandler from .config import HubRuntimeConfig from .constants import ( - B_RES_ENCODING, - B_RES_ID, - B_RES_KIND, - B_RES_SHA256, - B_RES_SIZE, B_WELCOME_HUB, B_WELCOME_VER, - RES_KIND_BLOB, RES_KIND_MOTD, RES_KIND_NOTICE, T_ERROR, T_NOTICE, T_PING, - T_RESOURCE_ENVELOPE, T_WELCOME, ) from .envelope import make_envelope from .logging_config import configure_logging +from .resources import ResourceManager from .router import MessageRouter from .session import SessionManager from .util import expand_path -@dataclass -class _ResourceExpectation: - """Tracks an expected incoming Resource transfer.""" - id: bytes - kind: str - size: int - sha256: bytes | None - encoding: str | None - created_at: float - expires_at: float - room: str | None = None - - class HubService: def __init__(self, config: HubRuntimeConfig) -> None: self.config = config @@ -72,18 +52,15 @@ class HubService: # Command handler for operator commands self.command_handler = CommandHandler(self) + + # Resource manager for file/data transfers + self.resource_manager = ResourceManager(self) self.identity: RNS.Identity | None = None self.destination: RNS.Destination | None = None self.rooms: dict[str, set[RNS.Link]] = {} - # Resource transfer state - self._resource_expectations: dict[RNS.Link, dict[bytes, _ResourceExpectation]] = {} - self._active_resources: dict[RNS.Link, set[RNS.Resource]] = {} - # Tracks which expectation RID was matched to an advertised Resource. - self._resource_bindings: dict[RNS.Resource, bytes] = {} - self._trusted: set[bytes] = set() self._banned: set[bytes] = set() @@ -263,492 +240,6 @@ class HubService: # Resource transfer methods - def _cleanup_expired_expectations(self, link: RNS.Link) -> None: - """Remove expired resource expectations for a link.""" - now = time.time() - exp_dict = self._resource_expectations.get(link) - if not exp_dict: - return - - expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] - for rid in expired: - exp_dict.pop(rid, None) - self.log.debug( - "Expired resource expectation link_id=%s rid=%s", - self._fmt_link_id(link), - rid.hex() if isinstance(rid, bytes) else rid, - ) - - def _cleanup_all_expired_expectations(self) -> None: - """Cleanup expired resource expectations across all links.""" - now = time.time() - with self._state_lock: - for link, exp_dict in list(self._resource_expectations.items()): - if not exp_dict: - continue - - expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] - for rid in expired: - exp_dict.pop(rid, None) - self.log.debug( - "Expired resource expectation link_id=%s rid=%s", - self._fmt_link_id(link), - rid.hex() if isinstance(rid, bytes) else rid, - ) - - def _add_resource_expectation( - self, - link: RNS.Link, - *, - rid: bytes, - kind: str, - size: int, - sha256: bytes | None = None, - encoding: str | None = None, - room: str | None = None, - ) -> bool: - """Add a resource expectation. Returns False if limit exceeded.""" - self._cleanup_expired_expectations(link) - - exp_dict = self._resource_expectations.setdefault(link, {}) - - if len(exp_dict) >= self.config.max_pending_resource_expectations: - self.log.warning( - "Max pending expectations exceeded link_id=%s", - self._fmt_link_id(link), - ) - return False - - now = time.time() - exp = _ResourceExpectation( - id=rid, - kind=kind, - size=size, - sha256=sha256, - encoding=encoding, - created_at=now, - expires_at=now + self.config.resource_expectation_ttl_s, - room=room, - ) - exp_dict[rid] = exp - - self.log.debug( - "Added resource expectation link_id=%s rid=%s kind=%s size=%s", - self._fmt_link_id(link), - rid.hex(), - kind, - size, - ) - return True - - def _find_resource_expectation( - self, link: RNS.Link, size: int - ) -> _ResourceExpectation | None: - """Find a matching resource expectation by size (fallback matching).""" - self._cleanup_expired_expectations(link) - - exp_dict = self._resource_expectations.get(link) - if not exp_dict: - return None - - # Match by size (first match wins) - for exp in exp_dict.values(): - if exp.size == size: - return exp - - return None - - def _get_resource_expectation_by_rid( - self, link: RNS.Link, rid: bytes - ) -> _ResourceExpectation | None: - """Lookup an expectation by RID without removing it.""" - exp_dict = self._resource_expectations.get(link) - if not exp_dict: - return None - return exp_dict.get(rid) - - def _match_resource_expectation( - self, link: RNS.Link, *, rid: bytes | None, size: int, sha256: bytes | None - ) -> _ResourceExpectation | None: - """Find the expectation that should satisfy a completed resource. - - Preference order: - 1) Bound RID (from advertisement) when available. - 2) Exact RID lookup. - 3) Fallback: first size match whose sha256 (if present) matches. - """ - self._cleanup_expired_expectations(link) - - if rid is not None: - exp = self._get_resource_expectation_by_rid(link, rid) - if exp is not None: - return exp - - exp_dict = self._resource_expectations.get(link) - if not exp_dict: - return None - - # Avoid linear scan if nothing matches by size. - for exp in exp_dict.values(): - if exp.size != size: - continue - if exp.sha256 and sha256 and exp.sha256 != sha256: - continue - return exp - return None - - def _pop_resource_expectation( - self, link: RNS.Link, rid: bytes - ) -> _ResourceExpectation | None: - """Remove and return a resource expectation.""" - exp_dict = self._resource_expectations.get(link) - if not exp_dict: - return None - return exp_dict.pop(rid, None) - - def _resource_advertised(self, resource: RNS.Resource) -> bool: - """ - Callback when a Resource is advertised by remote peer. - Returns True to accept, False to reject. - - Minimize lock scope to prevent potential deadlocks with RNS internal locks. - """ - link = resource.link - - # Check config outside lock (immutable during runtime) - if not self.config.enable_resource_transfer: - self.log.debug( - "Rejecting resource (disabled) link_id=%s", - self._fmt_link_id(link), - ) - self._inc("resources_rejected") - return False - - # Check size limit (immutable config) - size = resource.total_size if hasattr(resource, "total_size") else resource.size - if size > self.config.max_resource_bytes: - self.log.warning( - "Rejecting resource (too large: %s > %s) link_id=%s", - size, - self.config.max_resource_bytes, - self._fmt_link_id(link), - ) - self._inc("resources_rejected") - return False - - # Check session exists and find expectation with minimal lock scope - with self._state_lock: - sess = self.session_manager.sessions.get(link) - if not sess: - self.log.debug( - "Rejecting resource (no session) link_id=%s", - self._fmt_link_id(link), - ) - self._inc("resources_rejected") - return False - - # Find matching expectation - exp = self._find_resource_expectation(link, size) - - # Check expectation outside lock - if not exp: - self.log.warning( - "Rejecting resource (no matching expectation) link_id=%s size=%s", - self._fmt_link_id(link), - size, - ) - self._inc("resources_rejected") - return False - - # Accept and register with minimal lock scope - self.log.info( - "Accepting resource link_id=%s size=%s kind=%s", - self._fmt_link_id(link), - size, - exp.kind, - ) - - with self._state_lock: - self._active_resources.setdefault(link, set()).add(resource) - # Remember which expectation RID this resource was matched to so the - # conclusion handler can verify and pop the correct entry. - self._resource_bindings[resource] = exp.id - - return True - - def _resource_concluded(self, resource: RNS.Resource) -> None: - """Callback when a Resource transfer completes.""" - link = resource.link - - with self._state_lock: - # Remove from active set and retrieve any bound expectation RID. - active_set = self._active_resources.get(link) - if active_set: - active_set.discard(resource) - bound_rid = self._resource_bindings.pop(resource, None) - - if resource.status != RNS.Resource.COMPLETE: - self.log.warning( - "Resource transfer failed link_id=%s status=%s", - self._fmt_link_id(link), - resource.status, - ) - return - - # Get payload outside the lock. - try: - payload = resource.data.read() if hasattr(resource.data, "read") else resource.data - if isinstance(payload, bytearray): - payload = bytes(payload) - except Exception as e: - self.log.error( - "Failed to read resource data link_id=%s: %s", - self._fmt_link_id(link), - e, - ) - return - - size = len(payload) - actual_hash = hashlib.sha256(payload).digest() - - # Find expectation using bound RID first, then RID lookup, then size/sha fallback. - exp = self._match_resource_expectation(link, rid=bound_rid, size=size, sha256=actual_hash) - if not exp: - self.log.warning( - "Received resource without expectation link_id=%s size=%s", - self._fmt_link_id(link), - size, - ) - return - - # Verify SHA256 if provided; keep expectation if mismatch so sender can retry. - if exp.sha256 and actual_hash != exp.sha256: - self.log.error( - "Resource SHA256 mismatch link_id=%s expected=%s actual=%s", - self._fmt_link_id(link), - exp.sha256.hex(), - actual_hash.hex(), - ) - return - - # Pop expectation only after validation succeeds. - self._pop_resource_expectation(link, exp.id) - - self._inc("resources_received") - self._inc("resource_bytes_received", size) - - self.log.info( - "Resource received link_id=%s size=%s kind=%s", - self._fmt_link_id(link), - size, - exp.kind, - ) - - # Dispatch by kind - try: - self._dispatch_received_resource(link, exp, payload) - except Exception as e: - self.log.exception( - "Failed to dispatch resource link_id=%s kind=%s: %s", - self._fmt_link_id(link), - exp.kind, - e, - ) - - def _dispatch_received_resource( - self, link: RNS.Link, exp: _ResourceExpectation, payload: bytes - ) -> None: - """Dispatch a received resource payload to appropriate handler.""" - if exp.kind == RES_KIND_NOTICE: - # Decode as text and deliver as notice - encoding = exp.encoding or "utf-8" - try: - text = payload.decode(encoding) - except Exception as e: - self.log.error( - "Failed to decode notice resource link_id=%s encoding=%s: %s", - self._fmt_link_id(link), - encoding, - e, - ) - return - - self.log.info( - "Received large NOTICE via resource link_id=%s room=%r chars=%s", - self._fmt_link_id(link), - exp.room, - len(text), - ) - - # Forward NOTICE to room members if room is specified - if exp.room and self.identity is not None: - with self._state_lock: - sess = self.session_manager.sessions.get(link) - peer_hash = sess.get("peer") if sess else None - room_members = self.rooms.get(exp.room, set()) - - if peer_hash and room_members: - notice_env = make_envelope( - T_NOTICE, - src=peer_hash, - room=exp.room, - body=text, - ) - notice_payload = encode(notice_env) - - # Forward to all room members except sender - forwarded = 0 - for other in room_members: - if other != link: - try: - other.packet(notice_payload) - forwarded += 1 - except Exception as e: - self.log.warning( - "Failed to forward NOTICE resource link_id=%s: %s", - self._fmt_link_id(other), - e, - ) - - if forwarded > 0: - self._inc("notices_forwarded") - self.log.debug( - "Forwarded NOTICE resource to %d members room=%s", - forwarded, - exp.room, - ) - - elif exp.kind == RES_KIND_MOTD: - # Similar to NOTICE - encoding = exp.encoding or "utf-8" - try: - text = payload.decode(encoding) - except Exception as e: - self.log.error( - "Failed to decode MOTD resource link_id=%s: %s", - self._fmt_link_id(link), - e, - ) - return - - self.log.info( - "Received MOTD via resource link_id=%s chars=%s", - self._fmt_link_id(link), - len(text), - ) - - elif exp.kind == RES_KIND_BLOB: - # Generic binary data - self.log.info( - "Received BLOB via resource link_id=%s bytes=%s", - self._fmt_link_id(link), - len(payload), - ) - else: - self.log.warning( - "Unknown resource kind link_id=%s kind=%s", - self._fmt_link_id(link), - exp.kind, - ) - - def _send_via_resource( - self, - link: RNS.Link, - *, - kind: str, - payload: bytes, - room: str | None = None, - encoding: str | None = None, - ) -> bool: - """ - Send large payload via Resource. - Returns True if successfully initiated, False otherwise. - """ - if not self.config.enable_resource_transfer: - return False - - size = len(payload) - if size > self.config.max_resource_bytes: - self.log.error( - "Payload too large for resource transfer: %s > %s", - size, - self.config.max_resource_bytes, - ) - return False - - # Generate resource ID - rid = os.urandom(8) - - # Compute SHA256 - sha256 = hashlib.sha256(payload).digest() - - # Send envelope first - if self.identity is None: - return False - - envelope_body = { - B_RES_ID: rid, - B_RES_KIND: kind, - B_RES_SIZE: size, - B_RES_SHA256: sha256, - } - if encoding: - envelope_body[B_RES_ENCODING] = encoding - - envelope = make_envelope( - T_RESOURCE_ENVELOPE, - src=self.identity.hash, - room=room, - body=envelope_body, - ) - - try: - envelope_payload = encode(envelope) - RNS.Packet(link, envelope_payload).send() - self._inc("bytes_out", len(envelope_payload)) - - self.log.debug( - "Sent resource envelope link_id=%s rid=%s kind=%s size=%s", - self._fmt_link_id(link), - rid.hex(), - kind, - size, - ) - except Exception as e: - self.log.error( - "Failed to send resource envelope link_id=%s: %s", - self._fmt_link_id(link), - e, - ) - return False - - # Create and advertise resource - try: - resource = RNS.Resource(payload, link, advertise=True, auto_compress=False) - - with self._state_lock: - self._active_resources.setdefault(link, set()).add(resource) - - self._inc("resources_sent") - self._inc("resource_bytes_sent", size) - - self.log.info( - "Sent resource link_id=%s rid=%s kind=%s size=%s", - self._fmt_link_id(link), - rid.hex(), - kind, - size, - ) - return True - - except Exception as e: - self.log.error( - "Failed to create resource link_id=%s: %s", - self._fmt_link_id(link), - e, - ) - return False - def _send_text_smart( self, link: RNS.Link, @@ -954,8 +445,7 @@ class HubService: with self._state_lock: links = self.session_manager.clear_all() self.rooms.clear() - self._resource_expectations.clear() - self._active_resources.clear() + self.resource_manager.clear_all() for link in links: try: @@ -1883,7 +1373,7 @@ class HubService: if self._shutdown.is_set(): break try: - self._cleanup_all_expired_expectations() + self.resource_manager.cleanup_all_expired_expectations() except Exception: self.log.exception("Resource cleanup failed") @@ -2109,10 +1599,7 @@ class HubService: def _on_link(self, link: RNS.Link) -> None: with self._state_lock: self.session_manager.on_link_established(link) - - # Initialize resource tracking for this link - self._resource_expectations[link] = {} - self._active_resources[link] = set() + self.resource_manager.on_link_established(link) link.set_packet_callback(lambda data, pkt: self._on_packet(link, data)) link.set_link_closed_callback(lambda closed_link: self._on_close(closed_link)) @@ -2123,21 +1610,7 @@ class HubService: ) # Set up resource callbacks - if self.config.enable_resource_transfer: - try: - link.set_resource_strategy(RNS.Link.ACCEPT_APP) - link.set_resource_callback(self._resource_advertised) - link.set_resource_concluded_callback(self._resource_concluded) - self.log.debug( - "Resource callbacks configured link_id=%s", - self._fmt_link_id(link), - ) - except Exception as e: - self.log.warning( - "Failed to set resource callbacks link_id=%s: %s", - self._fmt_link_id(link), - e, - ) + self.resource_manager.configure_link_callbacks(link) self.log.info("Link established link_id=%s", self._fmt_link_id(link)) @@ -2204,11 +1677,8 @@ class HubService: rooms_count = 0 with self._state_lock: - # Clean up resource state - self._resource_expectations.pop(link, None) - self._active_resources.pop(link, None) - - # Clean up session (also handles room cleanup) + # Clean up resource and session state + self.resource_manager.on_link_closed(link) peer, nick, rooms_count = self.session_manager.on_link_closed(link) self.log.info(