refactor stats management

This commit is contained in:
kc1awv
2026-01-07 11:41:02 -05:00
parent 7e5675fdd7
commit 33e6b72bf7
5 changed files with 198 additions and 155 deletions
+2 -2
View File
@@ -72,7 +72,7 @@ class CommandHandler:
)
return True
# Send response without room field for hub-level command
self._emit_notice(outgoing, link, None, self.hub._format_stats())
self._emit_notice(outgoing, link, None, self.hub.stats_manager.format_stats())
return True
if cmd == "list":
@@ -1099,7 +1099,7 @@ class CommandHandler:
text: str,
room: str | None = None,
) -> None:
self.hub._inc("errors_sent")
self.hub.stats_manager.inc("errors_sent")
env = make_envelope(T_ERROR, src=src, room=room, body=text)
if outgoing is None:
self.hub._send(link, env)
+10 -10
View File
@@ -252,7 +252,7 @@ class ResourceManager:
"Rejecting resource (disabled) link_id=%s",
self.hub._fmt_link_id(link),
)
self.hub._inc("resources_rejected")
self.hub.stats_manager.inc("resources_rejected")
return False
# Check size limit (immutable config)
@@ -264,7 +264,7 @@ class ResourceManager:
self.hub.config.max_resource_bytes,
self.hub._fmt_link_id(link),
)
self.hub._inc("resources_rejected")
self.hub.stats_manager.inc("resources_rejected")
return False
# Check session exists and find expectation with minimal lock scope
@@ -275,7 +275,7 @@ class ResourceManager:
"Rejecting resource (no session) link_id=%s",
self.hub._fmt_link_id(link),
)
self.hub._inc("resources_rejected")
self.hub.stats_manager.inc("resources_rejected")
return False
# Find matching expectation
@@ -288,7 +288,7 @@ class ResourceManager:
self.hub._fmt_link_id(link),
size,
)
self.hub._inc("resources_rejected")
self.hub.stats_manager.inc("resources_rejected")
return False
# Accept and register with minimal lock scope
@@ -365,8 +365,8 @@ class ResourceManager:
# Pop expectation only after validation succeeds.
self.pop_resource_expectation(link, exp.id)
self.hub._inc("resources_received")
self.hub._inc("resource_bytes_received", size)
self.hub.stats_manager.inc("resources_received")
self.hub.stats_manager.inc("resource_bytes_received", size)
self.log.info(
"Resource received link_id=%s size=%s kind=%s",
@@ -442,7 +442,7 @@ class ResourceManager:
)
if forwarded > 0:
self.hub._inc("notices_forwarded")
self.hub.stats_manager.inc("notices_forwarded")
self.log.debug(
"Forwarded NOTICE resource to %d members room=%s",
forwarded,
@@ -540,7 +540,7 @@ class ResourceManager:
try:
envelope_payload = encode(envelope)
RNS.Packet(link, envelope_payload).send()
self.hub._inc("bytes_out", len(envelope_payload))
self.hub.stats_manager.inc("bytes_out", len(envelope_payload))
self.log.debug(
"Sent resource envelope link_id=%s rid=%s kind=%s size=%s",
@@ -566,8 +566,8 @@ class ResourceManager:
with self.hub._state_lock:
self._active_resources.setdefault(link, set()).add(resource)
self.hub._inc("resources_sent")
self.hub._inc("resource_bytes_sent", size)
self.hub.stats_manager.inc("resources_sent")
self.hub.stats_manager.inc("resource_bytes_sent", size)
self.log.info(
"Sent resource link_id=%s rid=%s kind=%s size=%s",
+11 -11
View File
@@ -74,8 +74,8 @@ class MessageRouter:
if sess is None:
return
self.hub._inc("pkts_in")
self.hub._inc("bytes_in", len(data))
self.hub.stats_manager.inc("pkts_in")
self.hub.stats_manager.inc("bytes_in", len(data))
peer_hash = sess.get("peer")
if peer_hash is None:
@@ -88,7 +88,7 @@ class MessageRouter:
sess["peer"] = peer_hash
if not self.hub._refill_and_take(link, 1.0):
self.hub._inc("rate_limited")
self.hub.stats_manager.inc("rate_limited")
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Rate limited peer=%s link_id=%s",
@@ -105,7 +105,7 @@ class MessageRouter:
env = decode(data)
validate_envelope(env)
except Exception as e:
self.hub._inc("pkts_bad")
self.hub.stats_manager.inc("pkts_bad")
self.log.debug(
"Bad packet peer=%s link_id=%s bytes=%s err=%s",
self.hub._fmt_hash(peer_hash),
@@ -160,7 +160,7 @@ class MessageRouter:
def _handle_pong(self, link: RNS.Link, sess: dict[str, Any]) -> None:
"""Handle PONG message."""
self.hub._inc("pongs_in")
self.hub.stats_manager.inc("pongs_in")
sess["awaiting_pong"] = None
def _handle_resource_envelope(
@@ -437,7 +437,7 @@ class MessageRouter:
room = env.get(K_ROOM)
body = env.get(K_BODY)
self.hub._inc("joins")
self.hub.stats_manager.inc("joins")
if not isinstance(room, str) or not room:
if self.hub.identity is not None:
self.hub._emit_error(
@@ -582,7 +582,7 @@ class MessageRouter:
"""Handle PART message."""
room = env.get(K_ROOM)
self.hub._inc("parts")
self.hub.stats_manager.inc("parts")
if not isinstance(room, str) or not room:
if self.hub.identity is not None:
self.hub._emit_error(
@@ -815,9 +815,9 @@ class MessageRouter:
)
if t == T_MSG:
self.hub._inc("msgs_forwarded")
self.hub.stats_manager.inc("msgs_forwarded")
else:
self.hub._inc("notices_forwarded")
self.hub.stats_manager.inc("notices_forwarded")
def _handle_ping(
self,
@@ -828,10 +828,10 @@ class MessageRouter:
"""Handle PING message."""
body = env.get(K_BODY)
self.hub._inc("pings_in")
self.hub.stats_manager.inc("pings_in")
if self.hub.identity is not None:
pong = make_envelope(T_PONG, src=self.hub.identity.hash, body=body)
self.hub._inc("pongs_out")
self.hub.stats_manager.inc("pongs_out")
self.hub._queue_env(outgoing, link, pong)
def _extract_caps(self, body: Any) -> dict[int, Any]:
+15 -132
View File
@@ -30,6 +30,7 @@ from .resources import ResourceManager
from .rooms import RoomManager
from .router import MessageRouter, OutgoingList
from .session import SessionManager
from .stats import StatsManager
from .util import expand_path
@@ -59,6 +60,9 @@ class HubService:
# Room manager for room memberships and permissions
self.room_manager = RoomManager(self)
# Stats manager for metrics and reporting
self.stats_manager = StatsManager(self)
self.identity: RNS.Identity | None = None
self.destination: RNS.Destination | None = None
@@ -75,33 +79,6 @@ class HubService:
self._config_write_lock = threading.Lock()
self._started_wall_time: float | None = None
self._started_monotonic: float | None = None
# Lifetime counters for uptime statistics (monotonically increasing after startup).
# Python int has arbitrary precision, so overflow is not a concern.
self._counters: dict[str, int] = {
"bytes_in": 0,
"bytes_out": 0,
"pkts_in": 0,
"pkts_bad": 0,
"rate_limited": 0,
"errors_sent": 0,
"joins": 0,
"parts": 0,
"msgs_forwarded": 0,
"notices_forwarded": 0,
"pings_in": 0,
"pongs_in": 0,
"pings_out": 0,
"pongs_out": 0,
"announces": 0,
"resources_sent": 0,
"resources_received": 0,
"resources_rejected": 0,
"resource_bytes_sent": 0,
"resource_bytes_received": 0,
}
def _fmt_hash(self, h: Any, *, prefix: int = 12) -> str:
@@ -217,13 +194,6 @@ class HubService:
self._fmt_link_id(link),
)
def _inc(self, key: str, delta: int = 1) -> None:
try:
with self._state_lock:
self._counters[key] = int(self._counters.get(key, 0)) + int(delta)
except Exception:
pass
def _update_nick_index(self, link: RNS.Link, old_nick: str | None, new_nick: str | None) -> None:
"""Update nick index when a nick changes. Delegates to SessionManager."""
self.session_manager.update_nick_index(link, old_nick, new_nick)
@@ -332,7 +302,7 @@ class HubService:
outgoing = []
self._queue_notice_chunks(outgoing, link, room=room, text=text)
for out_link, chunk_payload in outgoing:
self._inc("bytes_out", len(chunk_payload))
self.stats_manager.inc("bytes_out", len(chunk_payload))
try:
RNS.Packet(out_link, chunk_payload).send()
except Exception as e:
@@ -353,10 +323,8 @@ class HubService:
def start(self) -> None:
self.log.info("Starting Reticulum")
if self._started_wall_time is None:
self._started_wall_time = time.time()
if self._started_monotonic is None:
self._started_monotonic = time.monotonic()
if self.stats_manager.started_wall_time is None:
self.stats_manager.set_start_time()
RNS.Reticulum(configdir=self.config.configdir, require_shared_instance=False)
if not self.config.identity_path:
@@ -443,7 +411,7 @@ class HubService:
self.destination.announce(
app_data=encode({"proto": "rrc", "v": 1, "hub": self.config.hub_name})
)
self._inc("announces")
self.stats_manager.inc("announces")
except Exception:
self.log.exception("Announce failed")
@@ -822,7 +790,7 @@ class HubService:
with self._state_lock:
dummy_link = next(iter(self.session_manager.sessions.keys()), None)
rooms_to_prune = self.room_manager.prune_unused_registered_rooms(
prune_after, self._started_wall_time or time.time()
prune_after, self.stats_manager.started_wall_time or time.time()
)
if dummy_link is not None:
@@ -923,7 +891,7 @@ class HubService:
def _queue_payload(
self, outgoing: list[tuple[RNS.Link, bytes]], link: RNS.Link, payload: bytes
) -> None:
self._inc("bytes_out", len(payload))
self.stats_manager.inc("bytes_out", len(payload))
outgoing.append((link, payload))
def _queue_env(
@@ -956,98 +924,13 @@ class HubService:
text: str,
room: str | None = None,
) -> None:
self._inc("errors_sent")
self.stats_manager.inc("errors_sent")
env = make_envelope(T_ERROR, src=src, room=room, body=text)
if outgoing is None:
self._send(link, env)
else:
self._queue_env(outgoing, link, env)
def _format_stats(self) -> str:
now_mono = time.monotonic()
started_mono = self._started_monotonic
uptime_s = (now_mono - started_mono) if started_mono is not None else 0.0
with self._state_lock:
session_stats = self.session_manager.get_stats()
sessions_total = session_stats["total"]
sessions_welcomed = session_stats["welcomed"]
sessions_identified = session_stats["identified"]
room_stats = self.room_manager.get_stats()
rooms_total = room_stats["rooms_total"]
memberships = room_stats["memberships"]
top_rooms = room_stats["top_rooms"]
trusted_count = len(self._trusted)
banned_count = len(self._banned)
c = dict(self._counters)
lines: list[str] = []
lines.append(f"rrcd {__version__} stats")
lines.append(f"uptime_s={uptime_s:.1f}")
lines.append(
f"clients_total={sessions_total} "
f"clients_identified={sessions_identified} "
f"clients_welcomed={sessions_welcomed}"
)
lines.append(f"rooms={rooms_total} memberships={memberships}")
if top_rooms:
lines.append("top_rooms=" + ", ".join(f"{r}:{n}" for r, n in top_rooms))
lines.append(f"trust: trusted={trusted_count} banned={banned_count}")
lines.append(
f"limits: rate_limit_msgs_per_minute={self.config.rate_limit_msgs_per_minute} "
f"max_rooms_per_session={self.config.max_rooms_per_session} "
f"max_room_name_len={self.config.max_room_name_len} "
f"nick_max_chars={self.config.nick_max_chars}"
)
lines.append(
f"features: ping_interval_s={self.config.ping_interval_s} "
f"ping_timeout_s={self.config.ping_timeout_s} "
f"announce_on_start={self.config.announce_on_start} "
f"announce_period_s={self.config.announce_period_s}"
)
lines.append(
"io: pkts_in={} pkts_bad={} bytes_in={} bytes_out={}".format(
c.get("pkts_in", 0),
c.get("pkts_bad", 0),
c.get("bytes_in", 0),
c.get("bytes_out", 0),
)
)
lines.append(
"events: joins={} parts={} msgs_fwd={} notices_fwd={} errors_sent={} rate_limited={}".format(
c.get("joins", 0),
c.get("parts", 0),
c.get("msgs_forwarded", 0),
c.get("notices_forwarded", 0),
c.get("errors_sent", 0),
c.get("rate_limited", 0),
)
)
lines.append(
"pings: in={} out={} pongs: in={} out={}".format(
c.get("pings_in", 0),
c.get("pings_out", 0),
c.get("pongs_in", 0),
c.get("pongs_out", 0),
)
)
lines.append(
"resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format(
c.get("resources_sent", 0),
c.get("resources_received", 0),
c.get("resources_rejected", 0),
c.get("resource_bytes_sent", 0),
c.get("resource_bytes_received", 0),
)
)
return "".join(lines)
def _on_link(self, link: RNS.Link) -> None:
with self._state_lock:
self.session_manager.on_link_established(link)
@@ -1107,7 +990,7 @@ class HubService:
# Send queued WELCOME first
for out_link, payload in outgoing:
self._inc("bytes_out", len(payload))
self.stats_manager.inc("bytes_out", len(payload))
try:
RNS.Packet(out_link, payload).send()
except OSError as e:
@@ -1160,7 +1043,7 @@ class HubService:
def _send(self, link: RNS.Link, env: dict) -> None:
payload = encode(env)
self._inc("bytes_out", len(payload))
self.stats_manager.inc("bytes_out", len(payload))
try:
RNS.Packet(link, payload).send()
except OSError as e:
@@ -1212,7 +1095,7 @@ class HubService:
)
for out_link, payload in outgoing:
self._inc("bytes_out", len(payload))
self.stats_manager.inc("bytes_out", len(payload))
try:
RNS.Packet(out_link, payload).send()
except OSError as e:
@@ -1294,7 +1177,7 @@ class HubService:
for link in to_ping:
ping = make_envelope(T_PING, src=self.identity.hash, body=now)
try:
self._inc("pings_out")
self.stats_manager.inc("pings_out")
self._send(link, ping)
except Exception:
pass
+160
View File
@@ -0,0 +1,160 @@
"""Statistics tracking and reporting for the RRC hub."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .service import HubService
class StatsManager:
"""
Manages hub statistics collection and reporting.
Tracks counters for:
- Bytes in/out
- Packets processed
- Rate limiting events
- Errors sent
- Room joins/parts
- Messages forwarded
- Ping/pong activity
- Announces
- Resource transfers
"""
def __init__(self, hub: HubService) -> None:
self.hub = hub
self.log = hub.log
self.started_wall_time: float | None = None
self.started_monotonic: float | None = None
# Lifetime counters for uptime statistics (monotonically increasing after startup).
# Python int has arbitrary precision, so overflow is not a concern.
self._counters: dict[str, int] = {
"bytes_in": 0,
"bytes_out": 0,
"pkts_in": 0,
"pkts_bad": 0,
"rate_limited": 0,
"errors_sent": 0,
"joins": 0,
"parts": 0,
"msgs_forwarded": 0,
"notices_forwarded": 0,
"pings_in": 0,
"pongs_in": 0,
"pings_out": 0,
"pongs_out": 0,
"announces": 0,
"resources_sent": 0,
"resources_received": 0,
"resources_rejected": 0,
"resource_bytes_sent": 0,
"resource_bytes_received": 0,
}
def set_start_time(self) -> None:
"""Set the start time for uptime calculations."""
self.started_wall_time = time.time()
self.started_monotonic = time.monotonic()
def inc(self, key: str, delta: int = 1) -> None:
"""Increment a counter by the given delta."""
try:
with self.hub._state_lock:
self._counters[key] = int(self._counters.get(key, 0)) + int(delta)
except Exception:
pass
def format_stats(self) -> str:
"""Format current statistics as a human-readable string."""
from . import __version__
now_mono = time.monotonic()
started_mono = self.started_monotonic
uptime_s = (now_mono - started_mono) if started_mono is not None else 0.0
with self.hub._state_lock:
session_stats = self.hub.session_manager.get_stats()
sessions_total = session_stats["total"]
sessions_welcomed = session_stats["welcomed"]
sessions_identified = session_stats["identified"]
room_stats = self.hub.room_manager.get_stats()
rooms_total = room_stats["rooms_total"]
memberships = room_stats["memberships"]
top_rooms = room_stats["top_rooms"]
trusted_count = len(self.hub._trusted)
banned_count = len(self.hub._banned)
c = dict(self._counters)
lines: list[str] = []
lines.append(f"rrcd {__version__} stats")
lines.append(f"uptime_s={uptime_s:.1f}")
lines.append(
f"clients_total={sessions_total} "
f"clients_identified={sessions_identified} "
f"clients_welcomed={sessions_welcomed}"
)
lines.append(f"rooms={rooms_total} memberships={memberships}")
if top_rooms:
lines.append("top_rooms=" + ", ".join(f"{r}:{n}" for r, n in top_rooms))
lines.append(f"trust: trusted={trusted_count} banned={banned_count}")
lines.append(
f"limits: rate_limit_msgs_per_minute={self.hub.config.rate_limit_msgs_per_minute} "
f"max_rooms_per_session={self.hub.config.max_rooms_per_session} "
f"max_room_name_len={self.hub.config.max_room_name_len} "
f"nick_max_chars={self.hub.config.nick_max_chars}"
)
lines.append(
f"features: ping_interval_s={self.hub.config.ping_interval_s} "
f"ping_timeout_s={self.hub.config.ping_timeout_s} "
f"announce_on_start={self.hub.config.announce_on_start} "
f"announce_period_s={self.hub.config.announce_period_s}"
)
lines.append(
"io: pkts_in={} pkts_bad={} bytes_in={} bytes_out={}".format(
c.get("pkts_in", 0),
c.get("pkts_bad", 0),
c.get("bytes_in", 0),
c.get("bytes_out", 0),
)
)
lines.append(
"events: joins={} parts={} msgs_fwd={} notices_fwd={} errors_sent={} rate_limited={}".format(
c.get("joins", 0),
c.get("parts", 0),
c.get("msgs_forwarded", 0),
c.get("notices_forwarded", 0),
c.get("errors_sent", 0),
c.get("rate_limited", 0),
)
)
lines.append(
"pings: in={} out={} pongs: in={} out={}".format(
c.get("pings_in", 0),
c.get("pings_out", 0),
c.get("pongs_in", 0),
c.get("pongs_out", 0),
)
)
lines.append(
"resources: sent={} received={} rejected={} bytes_sent={} bytes_received={}".format(
c.get("resources_sent", 0),
c.get("resources_received", 0),
c.get("resources_rejected", 0),
c.get("resource_bytes_sent", 0),
c.get("resource_bytes_received", 0),
)
)
return "".join(lines)