Improved interface discovery data sanitization

This commit is contained in:
Mark Qvist
2026-05-06 04:24:07 +02:00
parent 3d8079c02b
commit f9625b2b88
+33 -8
View File
@@ -200,6 +200,15 @@ class InterfaceAnnounceHandler:
self.callback = callback
self.stamper = LXStamper
@staticmethod
def sanitize_name(name):
if not name: return None
name = name.encode("ascii", "ignore").decode("ascii").strip()
for i in [5,3,2]: name = name.replace(" "*i, " ")
while len(name) and name[0] not in san_map: name = name[1:]
while len(name) and name[-1] not in san_map+")": name = name[:-1]
return name
def received_announce(self, destination_hash, announced_identity, app_data):
try:
discovery_sources = RNS.Reticulum.interface_discovery_sources()
@@ -234,10 +243,24 @@ class InterfaceAnnounceHandler:
info = None
unpacked = msgpack.unpackb(packed)
if INTERFACE_TYPE in unpacked:
interface_type = unpacked[INTERFACE_TYPE]
interface_type = unpacked[INTERFACE_TYPE]
name = self.sanitize_name(unpacked[NAME])
if type(unpacked[TRANSPORT]) != bool: raise ValueError("Invalid data in transport field of announce")
if type(unpacked[LATITUDE]) not in [type(None), float]: raise ValueError("Invalid data in latitude field of announce")
if type(unpacked[LONGITUDE]) not in [type(None), float]: raise ValueError("Invalid data in longitude field of announce")
if type(unpacked[HEIGHT]) not in [type(None), float]: raise ValueError("Invalid data in height field of announce")
if len(unpacked[TRANSPORT_ID]) != RNS.Identity.TRUNCATED_HASHLENGTH//8: raise ValueError("Invalid data in transport_id field of announce")
if not interface_type in InterfaceAnnouncer.DISCOVERABLE_INTERFACE_TYPES:
raise ValueError("Invalid interface type in announce data")
if REACHABLE_ON in unpacked:
if not (is_ip_address(unpacked[REACHABLE_ON]) or is_hostname(unpacked[REACHABLE_ON])):
raise ValueError("Invalid data in reachable_on field of announce")
info = {"type": interface_type,
"transport": unpacked[TRANSPORT],
"name": unpacked[NAME] or f"Discovered {interface_type}",
"name": name or f"Discovered {interface_type}",
"received": time.time(),
"stamp": stamp,
"value": value,
@@ -248,12 +271,8 @@ class InterfaceAnnounceHandler:
"longitude": unpacked[LONGITUDE],
"height": unpacked[HEIGHT]}
if REACHABLE_ON in unpacked:
if not (is_ip_address(unpacked[REACHABLE_ON]) or is_hostname(unpacked[REACHABLE_ON])):
raise ValueError("Invalid data in reachable_on field of announce")
if IFAC_NETNAME in unpacked: info["ifac_netname"] = unpacked[IFAC_NETNAME]
if IFAC_NETKEY in unpacked: info["ifac_netkey"] = unpacked[IFAC_NETKEY]
if IFAC_NETNAME in unpacked: info["ifac_netname"] = str(unpacked[IFAC_NETNAME])
if IFAC_NETKEY in unpacked: info["ifac_netkey"] = str(unpacked[IFAC_NETKEY])
if interface_type in ["BackboneInterface", "TCPServerInterface"]:
backbone_support = not RNS.vendor.platformutils.is_windows()
@@ -386,6 +405,7 @@ class InterfaceDiscovery():
with open(filepath, "rb") as f: info = msgpack.unpackb(f.read())
should_remove = False
heard_delta = now-info["last_heard"]
info["name"] = InterfaceAnnounceHandler.sanitize_name(info["name"])
if heard_delta > self.THRESHOLD_REMOVE: should_remove = True
elif discovery_sources and not "network_id" in info: should_remove = True
@@ -744,3 +764,8 @@ def is_hostname(hostname):
if re.match(r"[0-9]+$", components[-1]): return False
allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(label) for label in components)
san_map = ""
for i in range(48, 58): san_map += bytes([i]).decode("ascii")
for i in range(65, 91): san_map += bytes([i]).decode("ascii")
for i in range(97, 123): san_map += bytes([i]).decode("ascii")