Merge pull request #9 from kc1awv/bugsquash

Bugsquash
This commit is contained in:
Steve Miller
2026-01-06 10:29:50 -05:00
committed by GitHub
2 changed files with 292 additions and 69 deletions
+7
View File
@@ -11,6 +11,13 @@ This project follows the versioning policy in VERSIONING.md.
- Consolidated version number to single source in `rrcd/__init__.py` (pyproject.toml now reads it dynamically)
- Documentation updates for new command and mode in README.md and EX1-RRCD.md
### Minor fixes
- Fix potential deadlock in _resource_advertised
- Add resource timeout cleanup
- Improve notice as resource handling and probe for link MDU with fallback
- Improve nickname updates, O(1) lookups, nick tracking, disambiguation on multiple matches
## 0.1.2 - 2026-01-01
+285 -69
View File
@@ -95,6 +95,11 @@ class HubService:
self._trusted: set[bytes] = set()
self._banned: set[bytes] = set()
# Secondary indexes for efficient link lookups (O(1) instead of O(n)).
# These are maintained alongside sessions and must stay in sync.
self._index_by_hash: dict[bytes, RNS.Link] = {} # identity hash -> link
self._index_by_nick: dict[str, set[RNS.Link]] = {} # normalized nick (lowercase) -> links
# Room state (hub-local conventions; no new on-wire message types).
# _room_state holds active in-memory state (and registered state for empty rooms).
# _room_registry holds registered rooms loaded from config.
@@ -106,11 +111,14 @@ class HubService:
self._ping_thread: threading.Thread | None = None
self._announce_thread: threading.Thread | None = None
self._resource_cleanup_thread: threading.Thread | None = None
self._config_write_lock = threading.Lock()
self._started_wall_time: float | None = None
self._started_monotonic: float | None = None
# Lifetime counters for uptime statistics (monotonically increasing after startup).
# Python int has arbitrary precision, so overflow is not a concern.
self._counters: dict[str, int] = {
"bytes_in": 0,
"bytes_out": 0,
@@ -156,7 +164,12 @@ class HubService:
return "-"
def _packet_would_fit(self, link: RNS.Link, payload: bytes) -> bool:
"""Check if payload fits within link MDU without creating/packing packets."""
try:
# Query link MDU directly if available (more efficient than packing)
if hasattr(link, 'MDU') and link.MDU is not None:
return len(payload) <= link.MDU
# Fall back to packet creation if MDU not available
pkt = RNS.Packet(link, payload)
pkt.pack()
return True
@@ -261,6 +274,21 @@ class HubService:
except Exception:
pass
def _update_nick_index(self, link: RNS.Link, old_nick: str | None, new_nick: str | None) -> None:
"""Update nick index when a nick changes. Must be called under _state_lock."""
# Remove old nick mapping
if old_nick:
old_key = old_nick.strip().lower()
if old_key in self._index_by_nick:
self._index_by_nick[old_key].discard(link)
if not self._index_by_nick[old_key]:
self._index_by_nick.pop(old_key, None)
# Add new nick mapping
if new_nick:
new_key = new_nick.strip().lower()
self._index_by_nick.setdefault(new_key, set()).add(link)
# Resource transfer methods
def _cleanup_expired_expectations(self, link: RNS.Link) -> None:
@@ -279,6 +307,23 @@ class HubService:
rid.hex() if isinstance(rid, bytes) else rid,
)
def _cleanup_all_expired_expectations(self) -> None:
"""Cleanup expired resource expectations across all links."""
now = time.time()
with self._state_lock:
for link, exp_dict in list(self._resource_expectations.items()):
if not exp_dict:
continue
expired = [rid for rid, exp in exp_dict.items() if exp.expires_at <= now]
for rid in expired:
exp_dict.pop(rid, None)
self.log.debug(
"Expired resource expectation link_id=%s rid=%s",
self._fmt_link_id(link),
rid.hex() if isinstance(rid, bytes) else rid,
)
def _add_resource_expectation(
self,
link: RNS.Link,
@@ -354,9 +399,12 @@ class HubService:
"""
Callback when a Resource is advertised by remote peer.
Returns True to accept, False to reject.
Minimize lock scope to prevent potential deadlocks with RNS internal locks.
"""
link = resource.link
# Check config outside lock (immutable during runtime)
if not self.config.enable_resource_transfer:
self.log.debug(
"Rejecting resource (disabled) link_id=%s",
@@ -365,6 +413,19 @@ class HubService:
self._inc("resources_rejected")
return False
# Check size limit (immutable config)
size = resource.total_size if hasattr(resource, "total_size") else resource.size
if size > self.config.max_resource_bytes:
self.log.warning(
"Rejecting resource (too large: %s > %s) link_id=%s",
size,
self.config.max_resource_bytes,
self._fmt_link_id(link),
)
self._inc("resources_rejected")
return False
# Check session exists and find expectation with minimal lock scope
with self._state_lock:
sess = self.sessions.get(link)
if not sess:
@@ -375,38 +436,31 @@ class HubService:
self._inc("resources_rejected")
return False
# Check size limit
size = resource.total_size if hasattr(resource, "total_size") else resource.size
if size > self.config.max_resource_bytes:
self.log.warning(
"Rejecting resource (too large: %s > %s) link_id=%s",
size,
self.config.max_resource_bytes,
self._fmt_link_id(link),
)
self._inc("resources_rejected")
return False
# Check for matching expectation
# Find matching expectation
exp = self._find_resource_expectation(link, size)
if not exp:
self.log.warning(
"Rejecting resource (no matching expectation) link_id=%s size=%s",
self._fmt_link_id(link),
size,
)
self._inc("resources_rejected")
return False
# Accept
self.log.info(
"Accepting resource link_id=%s size=%s kind=%s",
# Check expectation outside lock
if not exp:
self.log.warning(
"Rejecting resource (no matching expectation) link_id=%s size=%s",
self._fmt_link_id(link),
size,
exp.kind,
)
self._inc("resources_rejected")
return False
# Accept and register with minimal lock scope
self.log.info(
"Accepting resource link_id=%s size=%s kind=%s",
self._fmt_link_id(link),
size,
exp.kind,
)
with self._state_lock:
self._active_resources.setdefault(link, set()).add(resource)
return True
return True
def _resource_concluded(self, resource: RNS.Resource) -> None:
"""Callback when a Resource transfer completes."""
@@ -504,15 +558,50 @@ class HubService:
)
return
# Log the notice (don't send back to sender)
self.log.info(
"Received large NOTICE via resource link_id=%s room=%r chars=%s",
self._fmt_link_id(link),
exp.room,
len(text),
)
# Note: In a full implementation, this would be forwarded to other room members
# For now, just acknowledge receipt
# Forward NOTICE to room members if room is specified
if exp.room and self.identity is not None:
with self._state_lock:
sess = self.sessions.get(link)
peer_hash = sess.get("peer") if sess else None
room_members = self.rooms.get(exp.room, set())
if peer_hash and room_members:
notice_env = make_envelope(
T_NOTICE,
src=peer_hash,
room=exp.room,
body=text,
)
notice_payload = encode(notice_env)
# Forward to all room members except sender
forwarded = 0
for other in room_members:
if other != link:
try:
other.packet(notice_payload)
forwarded += 1
except Exception as e:
self.log.warning(
"Failed to forward NOTICE resource link_id=%s: %s",
self._fmt_link_id(other),
e,
)
if forwarded > 0:
self._inc("notices_forwarded")
self.log.debug(
"Forwarded NOTICE resource to %d members room=%s",
forwarded,
exp.room,
)
elif exp.kind == RES_KIND_MOTD:
# Similar to NOTICE
@@ -804,6 +893,13 @@ class HubService:
)
self._prune_thread.start()
# Start resource cleanup thread if resource transfer is enabled
if self.config.enable_resource_transfer:
self._resource_cleanup_thread = threading.Thread(
target=self._resource_cleanup_loop, name="rrcd-resource-cleanup", daemon=True
)
self._resource_cleanup_thread.start()
def _announce_once(self) -> None:
if self.destination is None:
return
@@ -1521,6 +1617,9 @@ class HubService:
def _resolve_identity_hash(
self, token: str, *, room: str | None = None
) -> bytes | None:
"""Resolve token to identity hash. Returns hash if successful, None otherwise.
For ambiguous matches, use _resolve_identity_hash_with_matches instead.
"""
target_link = self._find_target_link(token, room=room)
if target_link is not None:
s = self.sessions.get(target_link)
@@ -1532,6 +1631,32 @@ class HubService:
except Exception:
return None
def _resolve_identity_hash_with_matches(
self, token: str, *, room: str | None = None
) -> tuple[bytes | None, list[RNS.Link]]:
"""Resolve token to identity hash, also returning all matching links.
Returns (hash, matches) tuple. Hash is None if ambiguous or not found.
Use matches list to provide helpful error messages.
"""
matches = self._find_target_links(token, room=room)
if len(matches) == 1:
# Exactly one match - get hash from session
s = self.sessions.get(matches[0])
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
return (bytes(ph), matches)
elif len(matches) > 1:
# Ambiguous - return None hash but provide matches for error message
return (None, matches)
# No matches from nick/hash-prefix lookup - try raw hash parse
try:
h = self._parse_identity_hash(token)
return (h, [])
except Exception:
return (None, [])
def _persist_room_state_to_registry(self, link: RNS.Link, room: str | None) -> None:
if room is None:
return
@@ -1957,15 +2082,24 @@ class HubService:
)
)
return "\n".join(lines)
return "".join(lines)
def _find_target_link(self, token: str, room: str | None = None) -> RNS.Link | None:
"""Find a link by nick or identity hash prefix. Uses indexes for O(1) lookups.
Returns the link if exactly one match, None otherwise.
"""
result = self._find_target_links(token, room)
if len(result) == 1:
return result[0]
return None
def _find_target_links(self, token: str, room: str | None = None) -> list[RNS.Link]:
"""Find all links matching a nick or identity hash prefix.
Returns list of matching links (empty if none, multiple if ambiguous).
"""
t = token.strip().lower()
if not t:
return None
with self._state_lock:
items = list(self.sessions.items())
return []
# If it's hex-like, treat as an identity hash prefix.
hex_candidate = t[2:] if t.startswith("0x") else t
@@ -1977,31 +2111,67 @@ class HubService:
prefix = bytes.fromhex(hex_candidate)
except Exception:
prefix = None
if prefix is not None:
matches: list[RNS.Link] = []
for candidate_link, sess in items:
ph = sess.get("peer")
if isinstance(ph, (bytes, bytearray)) and bytes(ph).startswith(
prefix
):
if room is not None and room not in sess.get("rooms", set()):
continue
matches.append(candidate_link)
if len(matches) == 1:
return matches[0]
return None
with self._state_lock:
# Search hash index for matching prefixes
matches: list[RNS.Link] = []
for peer_hash, candidate_link in self._index_by_hash.items():
if peer_hash.startswith(prefix):
# Check room membership if specified
if room is not None:
sess = self.sessions.get(candidate_link)
if sess and room not in sess.get("rooms", set()):
continue
matches.append(candidate_link)
return matches
# Otherwise treat as nickname (best-effort).
matches = []
for candidate_link, sess in items:
nick = sess.get("nick")
if isinstance(nick, str) and nick.strip().lower() == t:
if room is not None and room not in sess.get("rooms", set()):
# Otherwise treat as nickname - use nick index for O(1) lookup
with self._state_lock:
candidate_links = self._index_by_nick.get(t, set())
if not candidate_links:
return []
# Filter by room membership if specified
if room is not None:
matches = []
for candidate_link in candidate_links:
sess = self.sessions.get(candidate_link)
if sess and room in sess.get("rooms", set()):
matches.append(candidate_link)
else:
matches = list(candidate_links)
return matches
def _format_ambiguous_targets(
self, token: str, matches: list[RNS.Link]
) -> str:
"""Format a helpful message when target lookup is ambiguous."""
if not matches:
return f"target '{token}' not found"
with self._state_lock:
items = []
for match_link in matches:
sess = self.sessions.get(match_link)
if not sess:
continue
matches.append(candidate_link)
if len(matches) == 1:
return matches[0]
return None
peer = sess.get("peer")
nick = sess.get("nick")
hash_str = self._fmt_hash(peer, prefix=16) if peer else "?"
nick_str = f"nick={nick!r}" if nick else "(no nick)"
items.append(f"{hash_str} {nick_str}")
if len(items) == 0:
return f"target '{token}' not found"
return (
f"ambiguous: '{token}' matches {len(items)} identities:\n"
+ "\n".join(f" - {item}" for item in items)
+ "\nUse full or longer identity hash to disambiguate."
)
def _handle_operator_command(
self,
@@ -2157,8 +2327,10 @@ class HubService:
target_link = self._find_target_link(target, room=r)
if target_link is None:
# Check if ambiguous or just not found
all_matches = self._find_target_links(target, room=r)
self._emit_notice(
outgoing, link, room, "target not found (or ambiguous)"
outgoing, link, room, self._format_ambiguous_targets(target, all_matches)
)
return True
@@ -2248,6 +2420,16 @@ class HubService:
self._emit_notice(outgoing, link, None, f"kline added for {target}")
return True
# Not found as active link - check if ambiguous or try as raw hash
all_matches = self._find_target_links(target, room=None)
if all_matches:
# Ambiguous
self._emit_notice(
outgoing, link, None, self._format_ambiguous_targets(target, all_matches)
)
return True
# Try as raw hash
try:
h = self._parse_identity_hash(target)
except Exception as e:
@@ -2467,12 +2649,14 @@ class HubService:
room=r,
)
return True
target_hash = self._resolve_identity_hash(parts[2], room=r)
target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[2], room=r)
if target_hash is None:
self._emit_notice(
outgoing, link, room, "target not found (or invalid hash)"
outgoing, link, room, self._format_ambiguous_targets(parts[2], all_matches)
)
return True
st = self._room_state_ensure(r)
founder = st.get("founder")
founder_b = (
@@ -2614,10 +2798,10 @@ class HubService:
)
return True
target_hash = self._resolve_identity_hash(parts[3], room=r)
target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[3], room=r)
if target_hash is None:
self._emit_notice(
outgoing, link, room, "target not found (or invalid hash)"
outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches)
)
return True
@@ -2738,10 +2922,10 @@ class HubService:
)
return True
target_hash = self._resolve_identity_hash(parts[3], room=r)
target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[3], room=r)
if target_hash is None:
self._emit_notice(
outgoing, link, room, "target not found (or invalid hash)"
outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches)
)
return True
@@ -2867,12 +3051,14 @@ class HubService:
token = parts[3]
target_link = self._find_target_link(token, room=None)
if target_link is None:
# Check if ambiguous or just not found
all_matches = self._find_target_links(token, room=None)
if self.identity is not None:
self._emit_error(
outgoing,
link,
src=self.identity.hash,
text="invite failed: target is offline or ambiguous",
text=f"invite failed: {self._format_ambiguous_targets(token, all_matches)}",
room=r,
)
return True
@@ -2933,10 +3119,10 @@ class HubService:
)
return True
target_hash = self._resolve_identity_hash(parts[3], room=None)
target_hash, all_matches = self._resolve_identity_hash_with_matches(parts[3], room=None)
if target_hash is None:
self._emit_notice(
outgoing, link, room, "target not found (or invalid hash)"
outgoing, link, room, self._format_ambiguous_targets(parts[3], all_matches)
)
return True
@@ -3090,6 +3276,13 @@ class HubService:
peer = sess.get("peer")
nick = sess.get("nick")
rooms_count = len(sess.get("rooms") or ())
# Clean up indexes
if isinstance(peer, (bytes, bytearray)):
self._index_by_hash.pop(bytes(peer), None)
if nick:
self._update_nick_index(link, nick, None)
for room in list(sess["rooms"]):
self.rooms.get(room, set()).discard(link)
@@ -3395,22 +3588,31 @@ class HubService:
)
return
old_nick = sess.get("nick")
new_nick = None
if isinstance(nick, str):
n = normalize_nick(nick, max_chars=self.config.nick_max_chars)
if n is not None:
new_nick = n
sess["nick"] = n
if isinstance(body, dict):
sess["peer_caps"] = self._extract_caps(body)
# Back-compat: if a legacy client put nick in HELLO body, accept it.
if sess.get("nick") is None:
if new_nick is None:
legacy_nick = body.get(B_HELLO_NICK_LEGACY)
n2 = normalize_nick(
legacy_nick, max_chars=self.config.nick_max_chars
)
if n2 is not None:
new_nick = n2
sess["nick"] = n2
# Update nick index if nick changed
if old_nick != new_nick:
self._update_nick_index(link, old_nick, new_nick)
self.log.info(
"HELLO peer=%s nick=%r link_id=%s",
@@ -3433,26 +3635,35 @@ class HubService:
# (can happen when client restarts but RNS reuses deterministic link_id)
if self.identity is not None:
# Reset session state and process as new HELLO
old_nick = sess.get("nick")
sess["welcomed"] = False
sess["rooms"] = set()
sess["nick"] = None
sess["peer_caps"] = {}
new_nick = None
# Process the HELLO message
if isinstance(nick, str):
n = normalize_nick(nick, max_chars=self.config.nick_max_chars)
if n is not None:
new_nick = n
sess["nick"] = n
if isinstance(body, dict):
sess["peer_caps"] = self._extract_caps(body)
if sess.get("nick") is None:
if new_nick is None:
legacy_nick = body.get(B_HELLO_NICK_LEGACY)
n2 = normalize_nick(
legacy_nick, max_chars=self.config.nick_max_chars
)
if n2 is not None:
new_nick = n2
sess["nick"] = n2
# Update nick index if nick changed
if old_nick != new_nick:
self._update_nick_index(link, old_nick, new_nick)
self.log.info(
"Re-HELLO peer=%s nick=%r link_id=%s",
@@ -3797,6 +4008,11 @@ class HubService:
# Client provided a nickname in this message - validate and preserve it
n = normalize_nick(incoming_nick, max_chars=self.config.nick_max_chars)
if n is not None:
# Update session nick and index if it changed
old_session_nick = sess.get("nick")
if old_session_nick != n:
sess["nick"] = n
self._update_nick_index(link, old_session_nick, n)
env[K_NICK] = n
else:
# Invalid nickname provided - remove it