diff --git a/rrcd/commands.py b/rrcd/commands.py index a1917b5..7e5b222 100644 --- a/rrcd/commands.py +++ b/rrcd/commands.py @@ -72,7 +72,7 @@ class CommandHandler: ) return True # Send response without room field for hub-level command - self._emit_notice(outgoing, link, None, self.hub._format_stats()) + self._emit_notice(outgoing, link, None, self.hub.stats_manager.format_stats()) return True if cmd == "list": @@ -1099,7 +1099,7 @@ class CommandHandler: text: str, room: str | None = None, ) -> None: - self.hub._inc("errors_sent") + self.hub.stats_manager.inc("errors_sent") env = make_envelope(T_ERROR, src=src, room=room, body=text) if outgoing is None: self.hub._send(link, env) diff --git a/rrcd/resources.py b/rrcd/resources.py index ae9e082..38f4879 100644 --- a/rrcd/resources.py +++ b/rrcd/resources.py @@ -252,7 +252,7 @@ class ResourceManager: "Rejecting resource (disabled) link_id=%s", self.hub._fmt_link_id(link), ) - self.hub._inc("resources_rejected") + self.hub.stats_manager.inc("resources_rejected") return False # Check size limit (immutable config) @@ -264,7 +264,7 @@ class ResourceManager: self.hub.config.max_resource_bytes, self.hub._fmt_link_id(link), ) - self.hub._inc("resources_rejected") + self.hub.stats_manager.inc("resources_rejected") return False # Check session exists and find expectation with minimal lock scope @@ -275,7 +275,7 @@ class ResourceManager: "Rejecting resource (no session) link_id=%s", self.hub._fmt_link_id(link), ) - self.hub._inc("resources_rejected") + self.hub.stats_manager.inc("resources_rejected") return False # Find matching expectation @@ -288,7 +288,7 @@ class ResourceManager: self.hub._fmt_link_id(link), size, ) - self.hub._inc("resources_rejected") + self.hub.stats_manager.inc("resources_rejected") return False # Accept and register with minimal lock scope @@ -365,8 +365,8 @@ class ResourceManager: # Pop expectation only after validation succeeds. self.pop_resource_expectation(link, exp.id) - self.hub._inc("resources_received") - self.hub._inc("resource_bytes_received", size) + self.hub.stats_manager.inc("resources_received") + self.hub.stats_manager.inc("resource_bytes_received", size) self.log.info( "Resource received link_id=%s size=%s kind=%s", @@ -442,7 +442,7 @@ class ResourceManager: ) if forwarded > 0: - self.hub._inc("notices_forwarded") + self.hub.stats_manager.inc("notices_forwarded") self.log.debug( "Forwarded NOTICE resource to %d members room=%s", forwarded, @@ -540,7 +540,7 @@ class ResourceManager: try: envelope_payload = encode(envelope) RNS.Packet(link, envelope_payload).send() - self.hub._inc("bytes_out", len(envelope_payload)) + self.hub.stats_manager.inc("bytes_out", len(envelope_payload)) self.log.debug( "Sent resource envelope link_id=%s rid=%s kind=%s size=%s", @@ -566,8 +566,8 @@ class ResourceManager: with self.hub._state_lock: self._active_resources.setdefault(link, set()).add(resource) - self.hub._inc("resources_sent") - self.hub._inc("resource_bytes_sent", size) + self.hub.stats_manager.inc("resources_sent") + self.hub.stats_manager.inc("resource_bytes_sent", size) self.log.info( "Sent resource link_id=%s rid=%s kind=%s size=%s", diff --git a/rrcd/router.py b/rrcd/router.py index 30f1614..2f19534 100644 --- a/rrcd/router.py +++ b/rrcd/router.py @@ -74,8 +74,8 @@ class MessageRouter: if sess is None: return - self.hub._inc("pkts_in") - self.hub._inc("bytes_in", len(data)) + self.hub.stats_manager.inc("pkts_in") + self.hub.stats_manager.inc("bytes_in", len(data)) peer_hash = sess.get("peer") if peer_hash is None: @@ -88,7 +88,7 @@ class MessageRouter: sess["peer"] = peer_hash if not self.hub._refill_and_take(link, 1.0): - self.hub._inc("rate_limited") + self.hub.stats_manager.inc("rate_limited") if self.log.isEnabledFor(logging.DEBUG): self.log.debug( "Rate limited peer=%s link_id=%s", @@ -105,7 +105,7 @@ class MessageRouter: env = decode(data) validate_envelope(env) except Exception as e: - self.hub._inc("pkts_bad") + self.hub.stats_manager.inc("pkts_bad") self.log.debug( "Bad packet peer=%s link_id=%s bytes=%s err=%s", self.hub._fmt_hash(peer_hash), @@ -160,7 +160,7 @@ class MessageRouter: def _handle_pong(self, link: RNS.Link, sess: dict[str, Any]) -> None: """Handle PONG message.""" - self.hub._inc("pongs_in") + self.hub.stats_manager.inc("pongs_in") sess["awaiting_pong"] = None def _handle_resource_envelope( @@ -437,7 +437,7 @@ class MessageRouter: room = env.get(K_ROOM) body = env.get(K_BODY) - self.hub._inc("joins") + self.hub.stats_manager.inc("joins") if not isinstance(room, str) or not room: if self.hub.identity is not None: self.hub._emit_error( @@ -582,7 +582,7 @@ class MessageRouter: """Handle PART message.""" room = env.get(K_ROOM) - self.hub._inc("parts") + self.hub.stats_manager.inc("parts") if not isinstance(room, str) or not room: if self.hub.identity is not None: self.hub._emit_error( @@ -815,9 +815,9 @@ class MessageRouter: ) if t == T_MSG: - self.hub._inc("msgs_forwarded") + self.hub.stats_manager.inc("msgs_forwarded") else: - self.hub._inc("notices_forwarded") + self.hub.stats_manager.inc("notices_forwarded") def _handle_ping( self, @@ -828,10 +828,10 @@ class MessageRouter: """Handle PING message.""" body = env.get(K_BODY) - self.hub._inc("pings_in") + self.hub.stats_manager.inc("pings_in") if self.hub.identity is not None: pong = make_envelope(T_PONG, src=self.hub.identity.hash, body=body) - self.hub._inc("pongs_out") + self.hub.stats_manager.inc("pongs_out") self.hub._queue_env(outgoing, link, pong) def _extract_caps(self, body: Any) -> dict[int, Any]: diff --git a/rrcd/service.py b/rrcd/service.py index 93e39f5..cb33d7a 100644 --- a/rrcd/service.py +++ b/rrcd/service.py @@ -30,6 +30,7 @@ from .resources import ResourceManager from .rooms import RoomManager from .router import MessageRouter, OutgoingList from .session import SessionManager +from .stats import StatsManager from .util import expand_path @@ -59,6 +60,9 @@ class HubService: # Room manager for room memberships and permissions self.room_manager = RoomManager(self) + + # Stats manager for metrics and reporting + self.stats_manager = StatsManager(self) self.identity: RNS.Identity | None = None self.destination: RNS.Destination | None = None @@ -75,33 +79,6 @@ class HubService: self._config_write_lock = threading.Lock() - self._started_wall_time: float | None = None - self._started_monotonic: float | None = None - # Lifetime counters for uptime statistics (monotonically increasing after startup). - # Python int has arbitrary precision, so overflow is not a concern. - self._counters: dict[str, int] = { - "bytes_in": 0, - "bytes_out": 0, - "pkts_in": 0, - "pkts_bad": 0, - "rate_limited": 0, - "errors_sent": 0, - "joins": 0, - "parts": 0, - "msgs_forwarded": 0, - "notices_forwarded": 0, - "pings_in": 0, - "pongs_in": 0, - "pings_out": 0, - "pongs_out": 0, - "announces": 0, - "resources_sent": 0, - "resources_received": 0, - "resources_rejected": 0, - "resource_bytes_sent": 0, - "resource_bytes_received": 0, - } - def _fmt_hash(self, h: Any, *, prefix: int = 12) -> str: @@ -217,13 +194,6 @@ class HubService: self._fmt_link_id(link), ) - def _inc(self, key: str, delta: int = 1) -> None: - try: - with self._state_lock: - self._counters[key] = int(self._counters.get(key, 0)) + int(delta) - except Exception: - pass - def _update_nick_index(self, link: RNS.Link, old_nick: str | None, new_nick: str | None) -> None: """Update nick index when a nick changes. Delegates to SessionManager.""" self.session_manager.update_nick_index(link, old_nick, new_nick) @@ -332,7 +302,7 @@ class HubService: outgoing = [] self._queue_notice_chunks(outgoing, link, room=room, text=text) for out_link, chunk_payload in outgoing: - self._inc("bytes_out", len(chunk_payload)) + self.stats_manager.inc("bytes_out", len(chunk_payload)) try: RNS.Packet(out_link, chunk_payload).send() except Exception as e: @@ -353,10 +323,8 @@ class HubService: def start(self) -> None: self.log.info("Starting Reticulum") - if self._started_wall_time is None: - self._started_wall_time = time.time() - if self._started_monotonic is None: - self._started_monotonic = time.monotonic() + if self.stats_manager.started_wall_time is None: + self.stats_manager.set_start_time() RNS.Reticulum(configdir=self.config.configdir, require_shared_instance=False) if not self.config.identity_path: @@ -443,7 +411,7 @@ class HubService: self.destination.announce( app_data=encode({"proto": "rrc", "v": 1, "hub": self.config.hub_name}) ) - self._inc("announces") + self.stats_manager.inc("announces") except Exception: self.log.exception("Announce failed") @@ -822,7 +790,7 @@ class HubService: with self._state_lock: dummy_link = next(iter(self.session_manager.sessions.keys()), None) rooms_to_prune = self.room_manager.prune_unused_registered_rooms( - prune_after, self._started_wall_time or time.time() + prune_after, self.stats_manager.started_wall_time or time.time() ) if dummy_link is not None: @@ -923,7 +891,7 @@ class HubService: def _queue_payload( self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, payload: bytes ) -> None: - self._inc("bytes_out", len(payload)) + self.stats_manager.inc("bytes_out", len(payload)) outgoing.append((link, payload)) def _queue_env( @@ -956,98 +924,13 @@ class HubService: text: str, room: str | None = None, ) -> None: - self._inc("errors_sent") + self.stats_manager.inc("errors_sent") env = make_envelope(T_ERROR, src=src, room=room, body=text) if outgoing is None: self._send(link, env) else: self._queue_env(outgoing, link, env) - def _format_stats(self) -> str: - now_mono = time.monotonic() - started_mono = self._started_monotonic - uptime_s = (now_mono - started_mono) if started_mono is not None else 0.0 - - with self._state_lock: - session_stats = self.session_manager.get_stats() - sessions_total = session_stats["total"] - sessions_welcomed = session_stats["welcomed"] - sessions_identified = session_stats["identified"] - - room_stats = self.room_manager.get_stats() - rooms_total = room_stats["rooms_total"] - memberships = room_stats["memberships"] - top_rooms = room_stats["top_rooms"] - - trusted_count = len(self._trusted) - banned_count = len(self._banned) - c = dict(self._counters) - - lines: list[str] = [] - lines.append(f"rrcd {__version__} stats") - lines.append(f"uptime_s={uptime_s:.1f}") - lines.append( - f"clients_total={sessions_total} " - f"clients_identified={sessions_identified} " - f"clients_welcomed={sessions_welcomed}" - ) - lines.append(f"rooms={rooms_total} memberships={memberships}") - - if top_rooms: - lines.append("top_rooms=" + ", ".join(f"{r}:{n}" for r, n in top_rooms)) - - lines.append(f"trust: trusted={trusted_count} banned={banned_count}") - lines.append( - f"limits: rate_limit_msgs_per_minute={self.config.rate_limit_msgs_per_minute} " - f"max_rooms_per_session={self.config.max_rooms_per_session} " - f"max_room_name_len={self.config.max_room_name_len} " - f"nick_max_chars={self.config.nick_max_chars}" - ) - lines.append( - f"features: ping_interval_s={self.config.ping_interval_s} " - f"ping_timeout_s={self.config.ping_timeout_s} " - f"announce_on_start={self.config.announce_on_start} " - f"announce_period_s={self.config.announce_period_s}" - ) - - lines.append( - "io: pkts_in={} pkts_bad={} bytes_in={} bytes_out={}".format( - c.get("pkts_in", 0), - c.get("pkts_bad", 0), - c.get("bytes_in", 0), - c.get("bytes_out", 0), - ) - ) - lines.append( - "events: joins={} parts={} msgs_fwd={} notices_fwd={} errors_sent={} rate_limited={}".format( - c.get("joins", 0), - c.get("parts", 0), - c.get("msgs_forwarded", 0), - c.get("notices_forwarded", 0), - c.get("errors_sent", 0), - c.get("rate_limited", 0), - ) - ) - lines.append( - "pings: in={} out={} pongs: in={} out={}".format( - c.get("pings_in", 0), - c.get("pings_out", 0), - c.get("pongs_in", 0), - c.get("pongs_out", 0), - ) - ) - lines.append( - "resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format( - c.get("resources_sent", 0), - c.get("resources_received", 0), - c.get("resources_rejected", 0), - c.get("resource_bytes_sent", 0), - c.get("resource_bytes_received", 0), - ) - ) - - return "".join(lines) - def _on_link(self, link: RNS.Link) -> None: with self._state_lock: self.session_manager.on_link_established(link) @@ -1107,7 +990,7 @@ class HubService: # Send queued WELCOME first for out_link, payload in outgoing: - self._inc("bytes_out", len(payload)) + self.stats_manager.inc("bytes_out", len(payload)) try: RNS.Packet(out_link, payload).send() except OSError as e: @@ -1160,7 +1043,7 @@ class HubService: def _send(self, link: RNS.Link, env: dict) -> None: payload = encode(env) - self._inc("bytes_out", len(payload)) + self.stats_manager.inc("bytes_out", len(payload)) try: RNS.Packet(link, payload).send() except OSError as e: @@ -1212,7 +1095,7 @@ class HubService: ) for out_link, payload in outgoing: - self._inc("bytes_out", len(payload)) + self.stats_manager.inc("bytes_out", len(payload)) try: RNS.Packet(out_link, payload).send() except OSError as e: @@ -1294,7 +1177,7 @@ class HubService: for link in to_ping: ping = make_envelope(T_PING, src=self.identity.hash, body=now) try: - self._inc("pings_out") + self.stats_manager.inc("pings_out") self._send(link, ping) except Exception: pass diff --git a/rrcd/stats.py b/rrcd/stats.py new file mode 100644 index 0000000..4548e81 --- /dev/null +++ b/rrcd/stats.py @@ -0,0 +1,160 @@ +"""Statistics tracking and reporting for the RRC hub.""" + +from __future__ import annotations + +import threading +import time +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .service import HubService + + +class StatsManager: + """ + Manages hub statistics collection and reporting. + + Tracks counters for: + - Bytes in/out + - Packets processed + - Rate limiting events + - Errors sent + - Room joins/parts + - Messages forwarded + - Ping/pong activity + - Announces + - Resource transfers + """ + + def __init__(self, hub: HubService) -> None: + self.hub = hub + self.log = hub.log + + self.started_wall_time: float | None = None + self.started_monotonic: float | None = None + + # Lifetime counters for uptime statistics (monotonically increasing after startup). + # Python int has arbitrary precision, so overflow is not a concern. + self._counters: dict[str, int] = { + "bytes_in": 0, + "bytes_out": 0, + "pkts_in": 0, + "pkts_bad": 0, + "rate_limited": 0, + "errors_sent": 0, + "joins": 0, + "parts": 0, + "msgs_forwarded": 0, + "notices_forwarded": 0, + "pings_in": 0, + "pongs_in": 0, + "pings_out": 0, + "pongs_out": 0, + "announces": 0, + "resources_sent": 0, + "resources_received": 0, + "resources_rejected": 0, + "resource_bytes_sent": 0, + "resource_bytes_received": 0, + } + + def set_start_time(self) -> None: + """Set the start time for uptime calculations.""" + self.started_wall_time = time.time() + self.started_monotonic = time.monotonic() + + def inc(self, key: str, delta: int = 1) -> None: + """Increment a counter by the given delta.""" + try: + with self.hub._state_lock: + self._counters[key] = int(self._counters.get(key, 0)) + int(delta) + except Exception: + pass + + def format_stats(self) -> str: + """Format current statistics as a human-readable string.""" + from . import __version__ + + now_mono = time.monotonic() + started_mono = self.started_monotonic + uptime_s = (now_mono - started_mono) if started_mono is not None else 0.0 + + with self.hub._state_lock: + session_stats = self.hub.session_manager.get_stats() + sessions_total = session_stats["total"] + sessions_welcomed = session_stats["welcomed"] + sessions_identified = session_stats["identified"] + + room_stats = self.hub.room_manager.get_stats() + rooms_total = room_stats["rooms_total"] + memberships = room_stats["memberships"] + top_rooms = room_stats["top_rooms"] + + trusted_count = len(self.hub._trusted) + banned_count = len(self.hub._banned) + c = dict(self._counters) + + lines: list[str] = [] + lines.append(f"rrcd {__version__} stats") + lines.append(f"uptime_s={uptime_s:.1f}") + lines.append( + f"clients_total={sessions_total} " + f"clients_identified={sessions_identified} " + f"clients_welcomed={sessions_welcomed}" + ) + lines.append(f"rooms={rooms_total} memberships={memberships}") + + if top_rooms: + lines.append("top_rooms=" + ", ".join(f"{r}:{n}" for r, n in top_rooms)) + + lines.append(f"trust: trusted={trusted_count} banned={banned_count}") + lines.append( + f"limits: rate_limit_msgs_per_minute={self.hub.config.rate_limit_msgs_per_minute} " + f"max_rooms_per_session={self.hub.config.max_rooms_per_session} " + f"max_room_name_len={self.hub.config.max_room_name_len} " + f"nick_max_chars={self.hub.config.nick_max_chars}" + ) + lines.append( + f"features: ping_interval_s={self.hub.config.ping_interval_s} " + f"ping_timeout_s={self.hub.config.ping_timeout_s} " + f"announce_on_start={self.hub.config.announce_on_start} " + f"announce_period_s={self.hub.config.announce_period_s}" + ) + + lines.append( + "io: pkts_in={} pkts_bad={} bytes_in={} bytes_out={}".format( + c.get("pkts_in", 0), + c.get("pkts_bad", 0), + c.get("bytes_in", 0), + c.get("bytes_out", 0), + ) + ) + lines.append( + "events: joins={} parts={} msgs_fwd={} notices_fwd={} errors_sent={} rate_limited={}".format( + c.get("joins", 0), + c.get("parts", 0), + c.get("msgs_forwarded", 0), + c.get("notices_forwarded", 0), + c.get("errors_sent", 0), + c.get("rate_limited", 0), + ) + ) + lines.append( + "pings: in={} out={} pongs: in={} out={}".format( + c.get("pings_in", 0), + c.get("pings_out", 0), + c.get("pongs_in", 0), + c.get("pongs_out", 0), + ) + ) + lines.append( + "resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format( + c.get("resources_sent", 0), + c.get("resources_received", 0), + c.get("resources_rejected", 0), + c.get("resource_bytes_sent", 0), + c.get("resource_bytes_received", 0), + ) + ) + + return "".join(lines)