From f9625b2b88a00c8233114840520426a0dfd17dce Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 6 May 2026 04:24:07 +0200 Subject: [PATCH] Improved interface discovery data sanitization --- RNS/Discovery.py | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/RNS/Discovery.py b/RNS/Discovery.py index 47ab56ae..9422dc07 100644 --- a/RNS/Discovery.py +++ b/RNS/Discovery.py @@ -200,6 +200,15 @@ class InterfaceAnnounceHandler: self.callback = callback self.stamper = LXStamper + @staticmethod + def sanitize_name(name): + if not name: return None + name = name.encode("ascii", "ignore").decode("ascii").strip() + for i in [5,3,2]: name = name.replace(" "*i, " ") + while len(name) and name[0] not in san_map: name = name[1:] + while len(name) and name[-1] not in san_map+")": name = name[:-1] + return name + def received_announce(self, destination_hash, announced_identity, app_data): try: discovery_sources = RNS.Reticulum.interface_discovery_sources() @@ -234,10 +243,24 @@ class InterfaceAnnounceHandler: info = None unpacked = msgpack.unpackb(packed) if INTERFACE_TYPE in unpacked: - interface_type = unpacked[INTERFACE_TYPE] + interface_type = unpacked[INTERFACE_TYPE] + name = self.sanitize_name(unpacked[NAME]) + + if type(unpacked[TRANSPORT]) != bool: raise ValueError("Invalid data in transport field of announce") + if type(unpacked[LATITUDE]) not in [type(None), float]: raise ValueError("Invalid data in latitude field of announce") + if type(unpacked[LONGITUDE]) not in [type(None), float]: raise ValueError("Invalid data in longitude field of announce") + if type(unpacked[HEIGHT]) not in [type(None), float]: raise ValueError("Invalid data in height field of announce") + if len(unpacked[TRANSPORT_ID]) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError("Invalid data in transport_id field of announce") + if not interface_type in InterfaceAnnouncer.DISCOVERABLE_INTERFACE_TYPES: + raise ValueError("Invalid interface type in announce data") + + if REACHABLE_ON in unpacked: + if not (is_ip_address(unpacked[REACHABLE_ON]) or is_hostname(unpacked[REACHABLE_ON])): + raise ValueError("Invalid data in reachable_on field of announce") + info = {"type": interface_type, "transport": unpacked[TRANSPORT], - "name": unpacked[NAME] or f"Discovered {interface_type}", + "name": name or f"Discovered {interface_type}", "received": time.time(), "stamp": stamp, "value": value, @@ -248,12 +271,8 @@ class InterfaceAnnounceHandler: "longitude": unpacked[LONGITUDE], "height": unpacked[HEIGHT]} - if REACHABLE_ON in unpacked: - if not (is_ip_address(unpacked[REACHABLE_ON]) or is_hostname(unpacked[REACHABLE_ON])): - raise ValueError("Invalid data in reachable_on field of announce") - - if IFAC_NETNAME in unpacked: info["ifac_netname"] = unpacked[IFAC_NETNAME] - if IFAC_NETKEY in unpacked: info["ifac_netkey"] = unpacked[IFAC_NETKEY] + if IFAC_NETNAME in unpacked: info["ifac_netname"] = str(unpacked[IFAC_NETNAME]) + if IFAC_NETKEY in unpacked: info["ifac_netkey"] = str(unpacked[IFAC_NETKEY]) if interface_type in ["BackboneInterface", "TCPServerInterface"]: backbone_support = not RNS.vendor.platformutils.is_windows() @@ -386,6 +405,7 @@ class InterfaceDiscovery(): with open(filepath, "rb") as f: info = msgpack.unpackb(f.read()) should_remove = False heard_delta = now-info["last_heard"] + info["name"] = InterfaceAnnounceHandler.sanitize_name(info["name"]) if heard_delta > self.THRESHOLD_REMOVE: should_remove = True elif discovery_sources and not "network_id" in info: should_remove = True @@ -744,3 +764,8 @@ def is_hostname(hostname): if re.match(r"[0-9]+$", components[-1]): return False allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(?