diff --git a/CHANGELOG.md b/CHANGELOG.md index 75f9035..ef81c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ This project follows the versioning policy in VERSIONING.md. - Consolidated version number to single source in `rrcd/__init__.py` (pyproject.toml now reads it dynamically) - Documentation updates for new command and mode in README.md and EX1-RRCD.md +### Minor fixes + +- Fix potential deadlock in _resource_advertised +- Add resource timeout cleanup +- Improve notice as resource handling and probe for link MDU with fallback +- Improve nickname updates, O(1) lookups, nick tracking, disambiguation on multiple matches + ## 0.1.2 - 2026-01-01 diff --git a/rrcd/service.py b/rrcd/service.py index 6e970c8..12af9ba 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -95,6 +95,11 @@ class HubService: self._trusted: set[bytes] = set() self._banned: set[bytes] = set() + # Secondary indexes for efficient link lookups (O(1) instead of O(n)). + # These are maintained alongside sessions and must stay in sync. + self._index_by_hash: dict[bytes, RNS.Link] = {} # identity hash -> link + self._index_by_nick: dict[str, set[RNS.Link]] = {} # normalized nick (lowercase) -> links + # Room state (hub-local conventions; no new on-wire message types). # _room_state holds active in-memory state (and registered state for empty rooms). # _room_registry holds registered rooms loaded from config. @@ -106,11 +111,14 @@ class HubService: self._ping_thread: threading.Thread | None = None self._announce_thread: threading.Thread | None = None + self._resource_cleanup_thread: threading.Thread | None = None self._config_write_lock = threading.Lock() self._started_wall_time: float | None = None self._started_monotonic: float | None = None + # Lifetime counters for uptime statistics (monotonically increasing after startup). + # Python int has arbitrary precision, so overflow is not a concern. self._counters: dict[str, int] = { "bytes_in": 0, "bytes_out": 0, @@ -156,7 +164,12 @@ class HubService: return "-" def _packet_would_fit(self, link: RNS.Link, payload: bytes) -> bool: + """Check if payload fits within link MDU without creating/packing packets.""" try: + # Query link MDU directly if available (more efficient than packing) + if hasattr(link, 'MDU') and link.MDU is not None: + return len(payload) <= link.MDU + # Fall back to packet creation if MDU not available pkt = RNS.Packet(link, payload) pkt.pack() return True @@ -261,6 +274,21 @@ class HubService: except Exception: pass + def _update_nick_index(self, link: RNS.Link, old_nick: str | None, new_nick: str | None) -> None: + """Update nick index when a nick changes. Must be called under _state_lock.""" + # Remove old nick mapping + if old_nick: + old_key = old_nick.strip().lower() + if old_key in self._index_by_nick: + self._index_by_nick[old_key].discard(link) + if not self._index_by_nick[old_key]: + self._index_by_nick.pop(old_key, None) + + # Add new nick mapping + if new_nick: + new_key = new_nick.strip().lower() + self._index_by_nick.setdefault(new_key, set()).add(link) + # Resource transfer methods def _cleanup_expired_expectations(self, link: RNS.Link) -> None: @@ -279,6 +307,23 @@ class HubService: rid.hex() if isinstance(rid, bytes) else rid, ) + def _cleanup_all_expired_expectations(self) -> None: + """Cleanup expired resource expectations across all links.""" + now = time.time() + with self._state_lock: + for link, exp_dict in list(self._resource_expectations.items()): + if not exp_dict: + continue + + expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now] + for rid in expired: + exp_dict.pop(rid, None) + self.log.debug( + "Expired resource expectation link_id=%s rid=%s", + self._fmt_link_id(link), + rid.hex() if isinstance(rid, bytes) else rid, + ) + def _add_resource_expectation( self, link: RNS.Link, @@ -354,9 +399,12 @@ class HubService: """ Callback when a Resource is advertised by remote peer. Returns True to accept, False to reject. + + Minimize lock scope to prevent potential deadlocks with RNS internal locks. """ link = resource.link + # Check config outside lock (immutable during runtime) if not self.config.enable_resource_transfer: self.log.debug( "Rejecting resource (disabled) link_id=%s", @@ -365,6 +413,19 @@ class HubService: self._inc("resources_rejected") return False + # Check size limit (immutable config) + size = resource.total_size if hasattr(resource, "total_size") else resource.size + if size > self.config.max_resource_bytes: + self.log.warning( + "Rejecting resource (too large: %s > %s) link_id=%s", + size, + self.config.max_resource_bytes, + self._fmt_link_id(link), + ) + self._inc("resources_rejected") + return False + + # Check session exists and find expectation with minimal lock scope with self._state_lock: sess = self.sessions.get(link) if not sess: @@ -375,38 +436,31 @@ class HubService: self._inc("resources_rejected") return False - # Check size limit - size = resource.total_size if hasattr(resource, "total_size") else resource.size - if size > self.config.max_resource_bytes: - self.log.warning( - "Rejecting resource (too large: %s > %s) link_id=%s", - size, - self.config.max_resource_bytes, - self._fmt_link_id(link), - ) - self._inc("resources_rejected") - return False - - # Check for matching expectation + # Find matching expectation exp = self._find_resource_expectation(link, size) - if not exp: - self.log.warning( - "Rejecting resource (no matching expectation) link_id=%s size=%s", - self._fmt_link_id(link), - size, - ) - self._inc("resources_rejected") - return False - - # Accept - self.log.info( - "Accepting resource link_id=%s size=%s kind=%s", + + # Check expectation outside lock + if not exp: + self.log.warning( + "Rejecting resource (no matching expectation) link_id=%s size=%s", self._fmt_link_id(link), size, - exp.kind, ) + self._inc("resources_rejected") + return False + + # Accept and register with minimal lock scope + self.log.info( + "Accepting resource link_id=%s size=%s kind=%s", + self._fmt_link_id(link), + size, + exp.kind, + ) + + with self._state_lock: self._active_resources.setdefault(link, set()).add(resource) - return True + + return True def _resource_concluded(self, resource: RNS.Resource) -> None: """Callback when a Resource transfer completes.""" @@ -504,15 +558,50 @@ class HubService: ) return - # Log the notice (don't send back to sender) self.log.info( "Received large NOTICE via resource link_id=%s room=%r chars=%s", self._fmt_link_id(link), exp.room, len(text), ) - # Note: In a full implementation, this would be forwarded to other room members - # For now, just acknowledge receipt + + # Forward NOTICE to room members if room is specified + if exp.room and self.identity is not None: + with self._state_lock: + sess = self.sessions.get(link) + peer_hash = sess.get("peer") if sess else None + room_members = self.rooms.get(exp.room, set()) + + if peer_hash and room_members: + notice_env = make_envelope( + T_NOTICE, + src=peer_hash, + room=exp.room, + body=text, + ) + notice_payload = encode(notice_env) + + # Forward to all room members except sender + forwarded = 0 + for other in room_members: + if other != link: + try: + other.packet(notice_payload) + forwarded += 1 + except Exception as e: + self.log.warning( + "Failed to forward NOTICE resource link_id=%s: %s", + self._fmt_link_id(other), + e, + ) + + if forwarded > 0: + self._inc("notices_forwarded") + self.log.debug( + "Forwarded NOTICE resource to %d members room=%s", + forwarded, + exp.room, + ) elif exp.kind == RES_KIND_MOTD: # Similar to NOTICE @@ -804,6 +893,13 @@ class HubService: ) self._prune_thread.start() + # Start resource cleanup thread if resource transfer is enabled + if self.config.enable_resource_transfer: + self._resource_cleanup_thread = threading.Thread( + target=self._resource_cleanup_loop, name="rrcd-resource-cleanup", daemon=True + ) + self._resource_cleanup_thread.start() + def _announce_once(self) -> None: if self.destination is None: return @@ -1521,6 +1617,9 @@ class HubService: def _resolve_identity_hash( self, token: str, *, room: str | None = None ) -> bytes | None: + """Resolve token to identity hash. Returns hash if successful, None otherwise. + For ambiguous matches, use _resolve_identity_hash_with_matches instead. + """ target_link = self._find_target_link(token, room=room) if target_link is not None: s = self.sessions.get(target_link) @@ -1532,6 +1631,32 @@ class HubService: except Exception: return None + def _resolve_identity_hash_with_matches( + self, token: str, *, room: str | None = None + ) -> tuple[bytes | None, list[RNS.Link]]: + """Resolve token to identity hash, also returning all matching links. + Returns (hash, matches) tuple. Hash is None if ambiguous or not found. + Use matches list to provide helpful error messages. + """ + matches = self._find_target_links(token, room=room) + + if len(matches) == 1: + # Exactly one match - get hash from session + s = self.sessions.get(matches[0]) + ph = s.get("peer") if s else None + if isinstance(ph, (bytes, bytearray)): + return (bytes(ph), matches) + elif len(matches) > 1: + # Ambiguous - return None hash but provide matches for error message + return (None, matches) + + # No matches from nick/hash-prefix lookup - try raw hash parse + try: + h = self._parse_identity_hash(token) + return (h, []) + except Exception: + return (None, []) + def _persist_room_state_to_registry(self, link: RNS.Link, room: str | None) -> None: if room is None: return @@ -1957,15 +2082,24 @@ class HubService: ) ) - return "\n".join(lines) + return "".join(lines) def _find_target_link(self, token: str, room: str | None = None) -> RNS.Link | None: + """Find a link by nick or identity hash prefix. Uses indexes for O(1) lookups. + Returns the link if exactly one match, None otherwise. + """ + result = self._find_target_links(token, room) + if len(result) == 1: + return result[0] + return None + + def _find_target_links(self, token: str, room: str | None = None) -> list[RNS.Link]: + """Find all links matching a nick or identity hash prefix. + Returns list of matching links (empty if none, multiple if ambiguous). + """ t = token.strip().lower() if not t: - return None - - with self._state_lock: - items = list(self.sessions.items()) + return [] # If it's hex-like, treat as an identity hash prefix. hex_candidate = t[2:] if t.startswith("0x") else t @@ -1977,31 +2111,67 @@ class HubService: prefix = bytes.fromhex(hex_candidate) except Exception: prefix = None + if prefix is not None: - matches: list[RNS.Link] = [] - for candidate_link, sess in items: - ph = sess.get("peer") - if isinstance(ph, (bytes, bytearray)) and bytes(ph).startswith( - prefix - ): - if room is not None and room not in sess.get("rooms", set()): - continue - matches.append(candidate_link) - if len(matches) == 1: - return matches[0] - return None + with self._state_lock: + # Search hash index for matching prefixes + matches: list[RNS.Link] = [] + for peer_hash, candidate_link in self._index_by_hash.items(): + if peer_hash.startswith(prefix): + # Check room membership if specified + if room is not None: + sess = self.sessions.get(candidate_link) + if sess and room not in sess.get("rooms", set()): + continue + matches.append(candidate_link) + + return matches - # Otherwise treat as nickname (best-effort). - matches = [] - for candidate_link, sess in items: - nick = sess.get("nick") - if isinstance(nick, str) and nick.strip().lower() == t: - if room is not None and room not in sess.get("rooms", set()): + # Otherwise treat as nickname - use nick index for O(1) lookup + with self._state_lock: + candidate_links = self._index_by_nick.get(t, set()) + if not candidate_links: + return [] + + # Filter by room membership if specified + if room is not None: + matches = [] + for candidate_link in candidate_links: + sess = self.sessions.get(candidate_link) + if sess and room in sess.get("rooms", set()): + matches.append(candidate_link) + else: + matches = list(candidate_links) + + return matches + + def _format_ambiguous_targets( + self, token: str, matches: list[RNS.Link] + ) -> str: + """Format a helpful message when target lookup is ambiguous.""" + if not matches: + return f"target '{token}' not found" + + with self._state_lock: + items = [] + for match_link in matches: + sess = self.sessions.get(match_link) + if not sess: continue - matches.append(candidate_link) - if len(matches) == 1: - return matches[0] - return None + peer = sess.get("peer") + nick = sess.get("nick") + hash_str = self._fmt_hash(peer, prefix=16) if peer else "?" + nick_str = f"nick={nick!r}" if nick else "(no nick)" + items.append(f"{hash_str} {nick_str}") + + if len(items) == 0: + return f"target '{token}' not found" + + return ( + f"ambiguous: '{token}' matches {len(items)} identities:\n" + + "\n".join(f" - {item}" for item in items) + + "\nUse full or longer identity hash to disambiguate." + ) def _handle_operator_command( self, @@ -2157,8 +2327,10 @@ class HubService: target_link = self._find_target_link(target, room=r) if target_link is None: + # Check if ambiguous or just not found + all_matches = self._find_target_links(target, room=r) self._emit_notice( - outgoing, link, room, "target not found (or ambiguous)" + outgoing, link, room, self._format_ambiguous_targets(target, all_matches) ) return True @@ -2248,6 +2420,16 @@ class HubService: self._emit_notice(outgoing, link, None, f"kline added for {target}") return True + # Not found as active link - check if ambiguous or try as raw hash + all_matches = self._find_target_links(target, room=None) + if all_matches: + # Ambiguous + self._emit_notice( + outgoing, link, None, self._format_ambiguous_targets(target, all_matches) + ) + return True + + # Try as raw hash try: h = self._parse_identity_hash(target) except Exception as e: @@ -2467,12 +2649,14 @@ class HubService: room=r, ) return True - target_hash = self._resolve_identity_hash(parts[2], room=r) + + target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[2], room=r) if target_hash is None: self._emit_notice( - outgoing, link, room, "target not found (or invalid hash)" + outgoing, link, room, self._format_ambiguous_targets(parts[2], all_matches) ) return True + st = self._room_state_ensure(r) founder = st.get("founder") founder_b = ( @@ -2614,10 +2798,10 @@ class HubService: ) return True - target_hash = self._resolve_identity_hash(parts[3], room=r) + target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[3], room=r) if target_hash is None: self._emit_notice( - outgoing, link, room, "target not found (or invalid hash)" + outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches) ) return True @@ -2738,10 +2922,10 @@ class HubService: ) return True - target_hash = self._resolve_identity_hash(parts[3], room=r) + target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[3], room=r) if target_hash is None: self._emit_notice( - outgoing, link, room, "target not found (or invalid hash)" + outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches) ) return True @@ -2867,12 +3051,14 @@ class HubService: token = parts[3] target_link = self._find_target_link(token, room=None) if target_link is None: + # Check if ambiguous or just not found + all_matches = self._find_target_links(token, room=None) if self.identity is not None: self._emit_error( outgoing, link, src=self.identity.hash, - text="invite failed: target is offline or ambiguous", + text=f"invite failed: {self._format_ambiguous_targets(token, all_matches)}", room=r, ) return True @@ -2933,10 +3119,10 @@ class HubService: ) return True - target_hash = self._resolve_identity_hash(parts[3], room=None) + target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[3], room=None) if target_hash is None: self._emit_notice( - outgoing, link, room, "target not found (or invalid hash)" + outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches) ) return True @@ -3090,6 +3276,13 @@ class HubService: peer = sess.get("peer") nick = sess.get("nick") rooms_count = len(sess.get("rooms") or ()) + + # Clean up indexes + if isinstance(peer, (bytes, bytearray)): + self._index_by_hash.pop(bytes(peer), None) + + if nick: + self._update_nick_index(link, nick, None) for room in list(sess["rooms"]): self.rooms.get(room, set()).discard(link) @@ -3395,22 +3588,31 @@ class HubService: ) return + old_nick = sess.get("nick") + new_nick = None + if isinstance(nick, str): n = normalize_nick(nick, max_chars=self.config.nick_max_chars) if n is not None: + new_nick = n sess["nick"] = n if isinstance(body, dict): sess["peer_caps"] = self._extract_caps(body) # Back-compat: if a legacy client put nick in HELLO body, accept it. - if sess.get("nick") is None: + if new_nick is None: legacy_nick = body.get(B_HELLO_NICK_LEGACY) n2 = normalize_nick( legacy_nick, max_chars=self.config.nick_max_chars ) if n2 is not None: + new_nick = n2 sess["nick"] = n2 + + # Update nick index if nick changed + if old_nick != new_nick: + self._update_nick_index(link, old_nick, new_nick) self.log.info( "HELLO peer=%s nick=%r link_id=%s", @@ -3433,26 +3635,35 @@ class HubService: # (can happen when client restarts but RNS reuses deterministic link_id) if self.identity is not None: # Reset session state and process as new HELLO + old_nick = sess.get("nick") sess["welcomed"] = False sess["rooms"] = set() sess["nick"] = None sess["peer_caps"] = {} + + new_nick = None # Process the HELLO message if isinstance(nick, str): n = normalize_nick(nick, max_chars=self.config.nick_max_chars) if n is not None: + new_nick = n sess["nick"] = n if isinstance(body, dict): sess["peer_caps"] = self._extract_caps(body) - if sess.get("nick") is None: + if new_nick is None: legacy_nick = body.get(B_HELLO_NICK_LEGACY) n2 = normalize_nick( legacy_nick, max_chars=self.config.nick_max_chars ) if n2 is not None: + new_nick = n2 sess["nick"] = n2 + + # Update nick index if nick changed + if old_nick != new_nick: + self._update_nick_index(link, old_nick, new_nick) self.log.info( "Re-HELLO peer=%s nick=%r link_id=%s", @@ -3797,6 +4008,11 @@ class HubService: # Client provided a nickname in this message - validate and preserve it n = normalize_nick(incoming_nick, max_chars=self.config.nick_max_chars) if n is not None: + # Update session nick and index if it changed + old_session_nick = sess.get("nick") + if old_session_nick != n: + sess["nick"] = n + self._update_nick_index(link, old_session_nick, n) env[K_NICK] = n else: # Invalid nickname provided - remove it