mirror of
https://github.com/kc1awv/rrcd.git
synced 2026-06-14 00:23:34 -07:00
fix potential deadlock in _resource_advertised
add resource timeout cleanup
This commit is contained in:
+63
-27
@@ -106,11 +106,14 @@ class HubService:
|
||||
|
||||
self._ping_thread: threading.Thread | None = None
|
||||
self._announce_thread: threading.Thread | None = None
|
||||
self._resource_cleanup_thread: threading.Thread | None = None
|
||||
|
||||
self._config_write_lock = threading.Lock()
|
||||
|
||||
self._started_wall_time: float | None = None
|
||||
self._started_monotonic: float | None = None
|
||||
# Lifetime counters for uptime statistics (monotonically increasing after startup).
|
||||
# Python int has arbitrary precision, so overflow is not a concern.
|
||||
self._counters: dict[str, int] = {
|
||||
"bytes_in": 0,
|
||||
"bytes_out": 0,
|
||||
@@ -279,6 +282,23 @@ class HubService:
|
||||
rid.hex() if isinstance(rid, bytes) else rid,
|
||||
)
|
||||
|
||||
def _cleanup_all_expired_expectations(self) -> None:
|
||||
"""Cleanup expired resource expectations across all links."""
|
||||
now = time.time()
|
||||
with self._state_lock:
|
||||
for link, exp_dict in list(self._resource_expectations.items()):
|
||||
if not exp_dict:
|
||||
continue
|
||||
|
||||
expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now]
|
||||
for rid in expired:
|
||||
exp_dict.pop(rid, None)
|
||||
self.log.debug(
|
||||
"Expired resource expectation link_id=%s rid=%s",
|
||||
self._fmt_link_id(link),
|
||||
rid.hex() if isinstance(rid, bytes) else rid,
|
||||
)
|
||||
|
||||
def _add_resource_expectation(
|
||||
self,
|
||||
link: RNS.Link,
|
||||
@@ -354,9 +374,12 @@ class HubService:
|
||||
"""
|
||||
Callback when a Resource is advertised by remote peer.
|
||||
Returns True to accept, False to reject.
|
||||
|
||||
Minimize lock scope to prevent potential deadlocks with RNS internal locks.
|
||||
"""
|
||||
link = resource.link
|
||||
|
||||
# Check config outside lock (immutable during runtime)
|
||||
if not self.config.enable_resource_transfer:
|
||||
self.log.debug(
|
||||
"Rejecting resource (disabled) link_id=%s",
|
||||
@@ -365,6 +388,19 @@ class HubService:
|
||||
self._inc("resources_rejected")
|
||||
return False
|
||||
|
||||
# Check size limit (immutable config)
|
||||
size = resource.total_size if hasattr(resource, "total_size") else resource.size
|
||||
if size > self.config.max_resource_bytes:
|
||||
self.log.warning(
|
||||
"Rejecting resource (too large: %s > %s) link_id=%s",
|
||||
size,
|
||||
self.config.max_resource_bytes,
|
||||
self._fmt_link_id(link),
|
||||
)
|
||||
self._inc("resources_rejected")
|
||||
return False
|
||||
|
||||
# Check session exists and find expectation with minimal lock scope
|
||||
with self._state_lock:
|
||||
sess = self.sessions.get(link)
|
||||
if not sess:
|
||||
@@ -375,38 +411,31 @@ class HubService:
|
||||
self._inc("resources_rejected")
|
||||
return False
|
||||
|
||||
# Check size limit
|
||||
size = resource.total_size if hasattr(resource, "total_size") else resource.size
|
||||
if size > self.config.max_resource_bytes:
|
||||
self.log.warning(
|
||||
"Rejecting resource (too large: %s > %s) link_id=%s",
|
||||
size,
|
||||
self.config.max_resource_bytes,
|
||||
self._fmt_link_id(link),
|
||||
)
|
||||
self._inc("resources_rejected")
|
||||
return False
|
||||
|
||||
# Check for matching expectation
|
||||
# Find matching expectation
|
||||
exp = self._find_resource_expectation(link, size)
|
||||
if not exp:
|
||||
self.log.warning(
|
||||
"Rejecting resource (no matching expectation) link_id=%s size=%s",
|
||||
self._fmt_link_id(link),
|
||||
size,
|
||||
)
|
||||
self._inc("resources_rejected")
|
||||
return False
|
||||
|
||||
# Accept
|
||||
self.log.info(
|
||||
"Accepting resource link_id=%s size=%s kind=%s",
|
||||
|
||||
# Check expectation outside lock
|
||||
if not exp:
|
||||
self.log.warning(
|
||||
"Rejecting resource (no matching expectation) link_id=%s size=%s",
|
||||
self._fmt_link_id(link),
|
||||
size,
|
||||
exp.kind,
|
||||
)
|
||||
self._inc("resources_rejected")
|
||||
return False
|
||||
|
||||
# Accept and register with minimal lock scope
|
||||
self.log.info(
|
||||
"Accepting resource link_id=%s size=%s kind=%s",
|
||||
self._fmt_link_id(link),
|
||||
size,
|
||||
exp.kind,
|
||||
)
|
||||
|
||||
with self._state_lock:
|
||||
self._active_resources.setdefault(link, set()).add(resource)
|
||||
return True
|
||||
|
||||
return True
|
||||
|
||||
def _resource_concluded(self, resource: RNS.Resource) -> None:
|
||||
"""Callback when a Resource transfer completes."""
|
||||
@@ -804,6 +833,13 @@ class HubService:
|
||||
)
|
||||
self._prune_thread.start()
|
||||
|
||||
# Start resource cleanup thread if resource transfer is enabled
|
||||
if self.config.enable_resource_transfer:
|
||||
self._resource_cleanup_thread = threading.Thread(
|
||||
target=self._resource_cleanup_loop, name="rrcd-resource-cleanup", daemon=True
|
||||
)
|
||||
self._resource_cleanup_thread.start()
|
||||
|
||||
def _announce_once(self) -> None:
|
||||
if self.destination is None:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user