From 5be73dd7a81858cf8824f76e9410482f94002255 Mon Sep 17 00:00:00 2001 From: kc1awv Date: Tue, 6 Jan 2026 09:38:56 -0500 Subject: [PATCH] fix potential deadlock in _resource_advertised add resource timeout cleanup --- rrcd/service.py | 90 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 27 deletions(-) diff --git a/rrcd/service.py b/rrcd/service.py index 6e970c8..a740466 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -106,11 +106,14 @@ class HubService: self._ping_thread: threading.Thread | None = None self._announce_thread: threading.Thread | None = None + self._resource_cleanup_thread: threading.Thread | None = None self._config_write_lock = threading.Lock() self._started_wall_time: float | None = None self._started_monotonic: float | None = None + # Lifetime counters for uptime statistics (monotonically increasing after startup). + # Python int has arbitrary precision, so overflow is not a concern. self._counters: dict[str, int] = { "bytes_in": 0, "bytes_out": 0, @@ -279,6 +282,23 @@ class HubService: rid.hex() if isinstance(rid, bytes) else rid, ) + def _cleanup_all_expired_expectations(self) -> None: + """Cleanup expired resource expectations across all links.""" + now = time.time() + with self._state_lock: + for link, exp_dict in list(self._resource_expectations.items()): + if not exp_dict: + continue + + expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] + for rid in expired: + exp_dict.pop(rid, None) + self.log.debug( + "Expired resource expectation link_id=%s rid=%s", + self._fmt_link_id(link), + rid.hex() if isinstance(rid, bytes) else rid, + ) + def _add_resource_expectation( self, link: RNS.Link, @@ -354,9 +374,12 @@ class HubService: """ Callback when a Resource is advertised by remote peer. Returns True to accept, False to reject. + + Minimize lock scope to prevent potential deadlocks with RNS internal locks. """ link = resource.link + # Check config outside lock (immutable during runtime) if not self.config.enable_resource_transfer: self.log.debug( "Rejecting resource (disabled) link_id=%s", @@ -365,6 +388,19 @@ class HubService: self._inc("resources_rejected") return False + # Check size limit (immutable config) + size = resource.total_size if hasattr(resource, "total_size") else resource.size + if size > self.config.max_resource_bytes: + self.log.warning( + "Rejecting resource (too large: %s > %s) link_id=%s", + size, + self.config.max_resource_bytes, + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + # Check session exists and find expectation with minimal lock scope with self._state_lock: sess = self.sessions.get(link) if not sess: @@ -375,38 +411,31 @@ class HubService: self._inc("resources_rejected") return False - # Check size limit - size = resource.total_size if hasattr(resource, "total_size") else resource.size - if size > self.config.max_resource_bytes: - self.log.warning( - "Rejecting resource (too large: %s > %s) link_id=%s", - size, - self.config.max_resource_bytes, - self._fmt_link_id(link), - ) - self._inc("resources_rejected") - return False - - # Check for matching expectation + # Find matching expectation exp = self._find_resource_expectation(link, size) - if not exp: - self.log.warning( - "Rejecting resource (no matching expectation) link_id=%s size=%s", - self._fmt_link_id(link), - size, - ) - self._inc("resources_rejected") - return False - - # Accept - self.log.info( - "Accepting resource link_id=%s size=%s kind=%s", + + # Check expectation outside lock + if not exp: + self.log.warning( + "Rejecting resource (no matching expectation) link_id=%s size=%s", self._fmt_link_id(link), size, - exp.kind, ) + self._inc("resources_rejected") + return False + + # Accept and register with minimal lock scope + self.log.info( + "Accepting resource link_id=%s size=%s kind=%s", + self._fmt_link_id(link), + size, + exp.kind, + ) + + with self._state_lock: self._active_resources.setdefault(link, set()).add(resource) - return True + + return True def _resource_concluded(self, resource: RNS.Resource) -> None: """Callback when a Resource transfer completes.""" @@ -804,6 +833,13 @@ class HubService: ) self._prune_thread.start() + # Start resource cleanup thread if resource transfer is enabled + if self.config.enable_resource_transfer: + self._resource_cleanup_thread = threading.Thread( + target=self._resource_cleanup_loop, name="rrcd-resource-cleanup", daemon=True + ) + self._resource_cleanup_thread.start() + def _announce_once(self) -> None: if self.destination is None: return