Files
rrcd/rrcd/service.py
T

599 lines
21 KiB
Python

from __future__ import annotations
import logging
import os
import signal
import threading
import time
from typing import Any
import RNS
from .codec import encode
from .commands import CommandHandler
from .config import ConfigManager, HubRuntimeConfig
from .constants import (
T_PING,
)
from .envelope import make_envelope
from .logging_config import configure_logging
from .messages import MessageHelper
from .resources import ResourceManager
from .rooms import RoomManager
from .router import MessageRouter, OutgoingList
from .session import SessionManager
from .stats import StatsManager
from .trust import TrustManager
from .util import expand_path
class HubService:
def __init__(self, config: HubRuntimeConfig) -> None:
self.config = config
self.log = logging.getLogger("rrcd.hub")
self._state_lock = threading.RLock()
self._shutdown = threading.Event()
self.router = MessageRouter(self)
self.session_manager = SessionManager(self)
self.command_handler = CommandHandler(self)
self.resource_manager = ResourceManager(self)
self.room_manager = RoomManager(self)
self.stats_manager = StatsManager(self)
self.trust_manager = TrustManager(self)
self.config_manager = ConfigManager(self)
self.message_helper = MessageHelper(self)
self.identity: RNS.Identity | None = None
self.destination: RNS.Destination | None = None
self._prune_thread: threading.Thread | None = None
self._ping_thread: threading.Thread | None = None
self._announce_thread: threading.Thread | None = None
self._resource_cleanup_thread: threading.Thread | None = None
def start(self) -> None:
self.log.info("Starting Reticulum")
if self.stats_manager.started_wall_time is None:
self.stats_manager.set_start_time()
RNS.Reticulum(configdir=self.config.configdir, require_shared_instance=False)
if not self.config.identity_path:
raise RuntimeError("identity_path is not set")
self.identity = self._load_identity(self.config.identity_path)
self.trust_manager.load_from_config(
list(self.config.trusted_identities),
list(self.config.banned_identities),
)
self._load_registered_rooms_from_registry()
parts = [p for p in str(self.config.dest_name).split(".") if p]
if not parts:
raise ValueError("dest_name must not be empty")
app_name, aspects = parts[0], parts[1:]
self.destination = RNS.Destination(
self.identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
app_name,
*aspects,
)
self.destination.set_link_established_callback(self._on_link)
if self.config.announce_on_start:
self._announce_once()
if self.config.announce_period_s and self.config.announce_period_s > 0:
self._announce_thread = threading.Thread(
target=self._announce_loop,
name="rrcd-announce",
daemon=True,
)
self._announce_thread.start()
self.log.info(
"Hub running dest_name=%s dest_hash=%s",
self.config.dest_name,
self.destination.hash.hex() if self.destination else "-",
)
self.log.info(
"Policy max_nick_bytes=%s max_rooms=%s max_room_name_bytes=%s rate_limit_msgs_per_minute=%s",
self.config.max_nick_bytes,
self.config.max_rooms_per_session,
self.config.max_room_name_bytes,
self.config.rate_limit_msgs_per_minute,
)
if self.config.ping_interval_s and self.config.ping_interval_s > 0:
self._ping_thread = threading.Thread(target=self._ping_loop, daemon=True)
self._ping_thread.start()
if (
self.config.room_registry_prune_interval_s
and self.config.room_registry_prune_interval_s > 0
and self.config.room_registry_prune_after_s
and self.config.room_registry_prune_after_s > 0
):
self._prune_thread = threading.Thread(
target=self._prune_loop, name="rrcd-room-prune", daemon=True
)
self._prune_thread.start()
if self.config.enable_resource_transfer:
self._resource_cleanup_thread = threading.Thread(
target=self._resource_cleanup_loop,
name="rrcd-resource-cleanup",
daemon=True,
)
self._resource_cleanup_thread.start()
def _announce_once(self) -> None:
if self.destination is None:
return
try:
self.destination.announce(
app_data=encode({"proto": "rrc", "v": 1, "hub": self.config.hub_name})
)
self.stats_manager.inc("announces")
except Exception:
self.log.exception("Announce failed")
def _announce_loop(self) -> None:
while not self._shutdown.is_set():
period = float(self.config.announce_period_s)
if period <= 0:
time.sleep(1.0)
continue
time.sleep(period)
if self._shutdown.is_set():
break
self._announce_once()
def run_forever(self) -> None:
if self.destination is None:
self.start()
signal.signal(signal.SIGINT, lambda *_: self.stop())
signal.signal(signal.SIGTERM, lambda *_: self.stop())
while not self._shutdown.is_set():
time.sleep(0.25)
def stop(self) -> None:
self._shutdown.set()
with self._state_lock:
links = self.session_manager.clear_all()
self.room_manager.clear_all()
self.resource_manager.clear_all()
for link in links:
try:
link.teardown()
except Exception:
pass
def _load_identity(self, path: str) -> RNS.Identity:
p = expand_path(path)
if not os.path.exists(p):
raise RuntimeError(f"Identity not found at {p}")
ident = RNS.Identity.from_file(p)
if ident is None:
raise RuntimeError(f"Failed to load identity from {p}")
return ident
def _parse_identity_hash(self, text: str) -> bytes:
s = str(text).strip().lower()
if s.startswith("0x"):
s = s[2:]
s = "".join(ch for ch in s if not ch.isspace())
try:
b = bytes.fromhex(s)
except Exception as e:
raise ValueError(f"invalid identity hash {text!r}: {e}") from e
if len(b) < 4:
raise ValueError(f"identity hash too short: {text!r}")
return b
def _ensure_worker_threads(self) -> None:
if self._announce_thread is None or not self._announce_thread.is_alive():
if (
self.config.announce_period_s
and float(self.config.announce_period_s) > 0
):
self._announce_thread = threading.Thread(
target=self._announce_loop,
name="rrcd-announce",
daemon=True,
)
self._announce_thread.start()
if self._ping_thread is None or not self._ping_thread.is_alive():
if self.config.ping_interval_s and float(self.config.ping_interval_s) > 0:
self._ping_thread = threading.Thread(
target=self._ping_loop, daemon=True
)
self._ping_thread.start()
if self._prune_thread is None or not self._prune_thread.is_alive():
if (
self.config.room_registry_prune_interval_s
and float(self.config.room_registry_prune_interval_s) > 0
and self.config.room_registry_prune_after_s
and float(self.config.room_registry_prune_after_s) > 0
):
self._prune_thread = threading.Thread(
target=self._prune_loop,
name="rrcd-room-prune",
daemon=True,
)
self._prune_thread.start()
def _fmt_hash(self, h: Any, *, prefix: int = 12) -> str:
if isinstance(h, (bytes, bytearray)):
s = bytes(h).hex()
return s if prefix <= 0 else s[: min(prefix, len(s))]
return "-"
def _fmt_link_id(self, link: RNS.Link) -> str:
lid = getattr(link, "link_id", None)
if isinstance(lid, (bytes, bytearray)):
return bytes(lid).hex()
h = getattr(link, "hash", None)
if isinstance(h, (bytes, bytearray)):
return bytes(h).hex()
return "-"
def _reload_config_and_rooms(
self,
link: RNS.Link,
room: str | None,
outgoing: list[tuple[RNS.Link, bytes]] | None = None,
) -> None:
cfg_path = self.config_manager.get_config_path_for_writes()
if not cfg_path or not os.path.exists(cfg_path):
self.message_helper.emit_notice(
outgoing, link, room, "reload failed: config_path not set or missing"
)
return
with self._state_lock:
old_cfg = self.config
old_trusted = set(self.trust_manager._trusted)
old_banned = set(self.trust_manager._banned)
old_registry = dict(self.room_manager._room_registry)
try:
data = self.config_manager.load_toml(cfg_path)
new_cfg = self.config_manager.apply_config_data(old_cfg, data)
except Exception as e:
self.message_helper.emit_notice(
outgoing, link, room, f"reload failed: config parse error: {e}"
)
return
try:
new_trusted = {
self._parse_identity_hash(h)
for h in (new_cfg.trusted_identities or ())
if str(h).strip()
}
new_banned = {
self._parse_identity_hash(h)
for h in (new_cfg.banned_identities or ())
if str(h).strip()
}
except Exception as e:
self.message_helper.emit_notice(
outgoing, link, room, f"reload failed: identity list parse error: {e}"
)
return
reg_path = (
expand_path(str(new_cfg.room_registry_path))
if new_cfg.room_registry_path
else ""
)
new_registry, reg_err = self.room_manager.load_registry_from_path(
reg_path,
invite_timeout_s=new_cfg.room_invite_timeout_s,
)
if reg_err is not None:
self.message_helper.emit_notice(
outgoing, link, room, f"reload failed: {reg_err}"
)
return
with self._state_lock:
self.config = new_cfg
self.trust_manager._trusted = new_trusted
self.trust_manager._banned = new_banned
self.room_manager._room_registry = new_registry
self.room_manager.merge_registry_into_state(new_registry)
self._ensure_worker_threads()
try:
configure_logging(self.config)
except Exception:
self.log.exception("Failed to reconfigure logging")
cfg_changes = self.config_manager.diff_config_summary(old_cfg, new_cfg)
room_changes = self.room_manager.diff_registry_summary(
old_registry, new_registry
)
lines: list[str] = []
lines.append(
f"reloaded: trusted={len(old_trusted)}->{len(new_trusted)} "
f"banned={len(old_banned)}->{len(new_banned)} "
f"registered_rooms={len(old_registry)}->{len(new_registry)}"
)
lines.append(f"policy: max_nick_bytes={new_cfg.max_nick_bytes}")
if cfg_changes:
lines.append("config_changes:")
preview = cfg_changes[:12]
lines.extend(f"- {x}" for x in preview)
if len(cfg_changes) > 12:
lines.append(f"- (+{len(cfg_changes) - 12} more)")
else:
lines.append("config_changes: (none)")
lines.append("rooms_changes:")
lines.extend(f"- {x}" for x in room_changes)
self.message_helper.emit_notice(outgoing, link, room, "\n".join(lines))
def _load_registered_rooms_from_registry(self) -> None:
reg_path = self.room_manager.get_registry_path_for_writes()
if not reg_path:
return
registry, err = self.room_manager.load_registry_from_path(
reg_path, invite_timeout_s=self.config.room_invite_timeout_s
)
if err is not None:
return
self.room_manager._room_registry = registry
def _resolve_identity_hash(
self, token: str, *, room: str | None = None
) -> bytes | None:
"""Resolve token to identity hash. Returns hash if successful, None otherwise.
For ambiguous matches, use _resolve_identity_hash_with_matches instead.
"""
target_link = self.command_handler._find_target_link(token, room=room)
if target_link is not None:
s = self.session_manager.sessions.get(target_link)
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
return bytes(ph)
try:
return self._parse_identity_hash(token)
except Exception:
return None
def _resolve_identity_hash_with_matches(
self, token: str, *, room: str | None = None
) -> tuple[bytes | None, list[RNS.Link]]:
"""Resolve token to identity hash, also returning all matching links.
Returns (hash, matches) tuple. Hash is None if ambiguous or not found.
Use matches list to provide helpful error messages.
"""
matches = self.command_handler._find_target_links(token, room=room)
if len(matches) == 1:
s = self.session_manager.sessions.get(matches[0])
ph = s.get("peer") if s else None
if isinstance(ph, (bytes, bytearray)):
return (bytes(ph), matches)
elif len(matches) > 1:
return (None, matches)
try:
h = self._parse_identity_hash(token)
return (h, [])
except Exception:
return (None, [])
def _resource_cleanup_loop(self) -> None:
"""Periodically cleanup expired resource expectations."""
while not self._shutdown.is_set():
time.sleep(30.0)
if self._shutdown.is_set():
break
try:
self.resource_manager.cleanup_all_expired_expectations()
except Exception:
self.log.exception("Resource cleanup failed")
def _prune_loop(self) -> None:
"""Periodically prune unused registered rooms."""
while not self._shutdown.is_set():
interval = float(self.config.room_registry_prune_interval_s)
prune_after = float(self.config.room_registry_prune_after_s)
if interval <= 0 or prune_after <= 0:
time.sleep(1.0)
continue
time.sleep(interval)
if self._shutdown.is_set():
break
rooms_to_prune: list[str] = []
dummy_link: RNS.Link | None = None
with self._state_lock:
dummy_link = next(iter(self.session_manager.sessions.keys()), None)
rooms_to_prune = self.room_manager.prune_unused_registered_rooms(
prune_after, self.stats_manager.started_wall_time or time.time()
)
if dummy_link is not None:
for room in rooms_to_prune:
self.room_manager.delete_room_from_registry(dummy_link, room)
for room in rooms_to_prune:
self.log.info("Pruned unused registered room %s", room)
def _on_link(self, link: RNS.Link) -> None:
with self._state_lock:
self.session_manager.on_link_established(link)
self.resource_manager.on_link_established(link)
link.set_packet_callback(lambda data, pkt: self._on_packet(link, data))
link.set_link_closed_callback(lambda closed_link: self._on_close(closed_link))
link.set_remote_identified_callback(
lambda identified_link, ident: self._on_remote_identified(
identified_link, ident
)
)
self.resource_manager.configure_link_callbacks(link)
self.log.info("Link established link_id=%s", self._fmt_link_id(link))
def _on_remote_identified(
self, link: RNS.Link, identity: RNS.Identity | None
) -> None:
banned = False
peer_hash = None
with self._state_lock:
banned, peer_hash = self.session_manager.on_remote_identified(
link, identity
)
if banned:
self.log.warning(
"Disconnecting banned peer peer=%s link_id=%s",
self._fmt_hash(peer_hash),
self._fmt_link_id(link),
)
if self.identity is not None:
try:
self.message_helper.error(
link, src=self.identity.hash, text="banned"
)
except Exception:
pass
try:
link.teardown()
except Exception:
pass
def _on_close(self, link: RNS.Link) -> None:
peer = None
nick = None
rooms_count = 0
with self._state_lock:
self.resource_manager.on_link_closed(link)
peer, nick, rooms_count = self.session_manager.on_link_closed(link)
self.log.info(
"Link closed peer=%s nick=%r rooms=%s link_id=%s",
self._fmt_hash(peer),
nick,
rooms_count,
self._fmt_link_id(link),
)
def _norm_room(self, room: str) -> str:
r = room.strip().lower()
if not r:
raise ValueError("room name must not be empty")
# Check UTF-8 byte length
room_bytes = len(r.encode("utf-8", errors="replace"))
if room_bytes > int(self.config.max_room_name_bytes):
raise ValueError(
f"room name too long: {room_bytes} bytes > {self.config.max_room_name_bytes} bytes"
)
return r
def _on_packet(self, link: RNS.Link, data: bytes) -> None:
outgoing: list[tuple[RNS.Link, bytes]] = OutgoingList()
with self._state_lock:
self.router.route_packet(link, data, outgoing)
if self.log.isEnabledFor(logging.DEBUG) and outgoing:
self.log.debug(
"Sending %d response(s) link_id=%s",
len(outgoing),
self._fmt_link_id(link),
)
for out_link, payload in outgoing:
self.stats_manager.inc("bytes_out", len(payload))
try:
RNS.Packet(out_link, payload).send()
except OSError as e:
self.log.warning(
"Send failed link_id=%s bytes=%s err=%s",
self._fmt_link_id(out_link),
len(payload),
e,
)
except Exception:
self.log.debug(
"Send failed link_id=%s bytes=%s",
self._fmt_link_id(out_link),
len(payload),
exc_info=True,
)
if hasattr(outgoing, "_post_send_callbacks"):
for callback in outgoing._post_send_callbacks: # type: ignore
try:
callback()
except Exception:
self.log.exception("Post-send callback failed")
def _ping_loop(self) -> None:
while not self._shutdown.is_set():
interval = float(self.config.ping_interval_s)
timeout = float(self.config.ping_timeout_s)
if interval <= 0:
time.sleep(1.0)
continue
time.sleep(interval)
if self.identity is None:
continue
now = time.monotonic()
to_teardown: list[RNS.Link] = []
to_ping: list[RNS.Link] = []
with self._state_lock:
for link, sess in list(self.session_manager.sessions.items()):
if not sess.get("welcomed"):
continue
awaiting = sess.get("awaiting_pong")
if (
timeout > 0
and awaiting is not None
and (now - float(awaiting)) > timeout
):
to_teardown.append(link)
continue
if awaiting is None:
sess["awaiting_pong"] = now
to_ping.append(link)
for link in to_teardown:
try:
link.teardown()
except Exception:
pass
for link in to_ping:
ping = make_envelope(T_PING, src=self.identity.hash, body=now)
try:
self.stats_manager.inc("pings_out")
self.message_helper.send(link, ping)
except Exception:
pass