From 60c440a3b6eeb0a60f68140ab33f10dfe377ce49 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 9 May 2026 01:14:40 +0200 Subject: [PATCH] Transport logic for path request ingress and egress control --- RNS/Packet.py | 3 +- RNS/Reticulum.py | 48 +++++++++++++++++++++++++++++ RNS/Transport.py | 79 ++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 113 insertions(+), 17 deletions(-) diff --git a/RNS/Packet.py b/RNS/Packet.py index f8ad70d8..053d9fbf 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -117,7 +117,7 @@ class Packet: __slots__ = "hops", "header", "header_type", "packet_type", "transport_type", "context", "context_flag", "destination" __slots__ += "transport_id", "data", "flags", "raw", "packed", "sent", "create_receipt", "receipt", "fromPacked", "MTU" __slots__ += "sent_at", "packet_hash", "ratchet_id", "attached_interface", "receiving_interface", "rssi", "snr", "q" - __slots__ += "ciphertext", "plaintext", "destination_hash", "destination_type", "link", "map_hash" + __slots__ += "ciphertext", "plaintext", "destination_hash", "destination_type", "link", "map_hash", "is_outbound_pr" def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST, header_type = HEADER_1, transport_id = None, attached_interface = None, create_receipt = True, context_flag=FLAG_UNSET): @@ -161,6 +161,7 @@ class Packet: self.attached_interface = attached_interface self.receiving_interface = None + self.is_outbound_pr = False self.rssi = None self.snr = None self.q = None diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index a1b9d0fd..2e2db931 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -269,9 +269,13 @@ class Reticulum: Reticulum.__ic_burst_hold = None Reticulum.__ic_burst_freq_new = None Reticulum.__ic_burst_freq = None + Reticulum.__ic_pr_burst_freq_new = None + Reticulum.__ic_pr_burst_freq = None Reticulum.__ic_new_time = None Reticulum.__ic_burst_penalty = None Reticulum.__ic_held_release_interval = None + Reticulum.__ec_pr_freq = None + Reticulum.__egress_control = None Reticulum.panic_on_interface_error = False @@ -619,6 +623,22 @@ class Reticulum: v = self.config["reticulum"].as_float(option) if v >= 0: Reticulum.__ic_burst_freq = v + if option == "ic_pr_burst_freq_new": + v = self.config["reticulum"].as_float(option) + if v >= 0: Reticulum.__ic_pr_burst_freq_new = v + + if option == "ic_pr_burst_freq": + v = self.config["reticulum"].as_float(option) + if v >= 0: Reticulum.__ic_pr_burst_freq = v + + if option == "ec_pr_freq": + v = self.config["reticulum"].as_float(option) + if v >= 0: Reticulum.__ec_pr_freq = v + + if option == "egress_control": + v = self.config["reticulum"].as_bool(option) + if v >= 0: Reticulum.__egress_control = v + if option == "ic_new_time": v = self.config["reticulum"].as_float(option) if v >= 0: Reticulum.__ic_new_time = v @@ -719,6 +739,8 @@ class Reticulum: ingress_control = True if "ingress_control" in c: ingress_control = c.as_bool("ingress_control") + egress_control = None + if "egress_control" in c: egress_control = c.as_bool("egress_control") ic_max_held_announces = None if "ic_max_held_announces" in c: ic_max_held_announces = c.as_int("ic_max_held_announces") ic_burst_hold = None @@ -727,6 +749,12 @@ class Reticulum: if "ic_burst_freq_new" in c: ic_burst_freq_new = c.as_float("ic_burst_freq_new") ic_burst_freq = None if "ic_burst_freq" in c: ic_burst_freq = c.as_float("ic_burst_freq") + ic_pr_burst_freq_new = None + if "ic_pr_burst_freq_new" in c: ic_pr_burst_freq_new = c.as_float("ic_pr_burst_freq_new") + ic_pr_burst_freq = None + if "ic_pr_burst_freq" in c: ic_pr_burst_freq = c.as_float("ic_pr_burst_freq") + ec_pr_freq = None + if "ec_pr_freq" in c: ec_pr_freq = c.as_float("ec_pr_freq") ic_new_time = None if "ic_new_time" in c: ic_new_time = c.as_float("ic_new_time") ic_burst_penalty = None @@ -852,10 +880,14 @@ class Reticulum: interface.announce_rate_grace = announce_rate_grace interface.announce_rate_penalty = announce_rate_penalty interface.ingress_control = ingress_control + if egress_control != None: interface.egress_control = egress_control if ic_max_held_announces != None: interface.ic_max_held_announces = ic_max_held_announces if ic_burst_hold != None: interface.ic_burst_hold = ic_burst_hold if ic_burst_freq_new != None: interface.ic_burst_freq_new = ic_burst_freq_new if ic_burst_freq != None: interface.ic_burst_freq = ic_burst_freq + if ic_pr_burst_freq_new != None: interface.ic_pr_burst_freq_new = ic_pr_burst_freq_new + if ic_pr_burst_freq != None: interface.ic_pr_burst_freq = ic_pr_burst_freq + if ec_pr_freq != None: interface.ec_pr_freq = ec_pr_freq if ic_new_time != None: interface.ic_new_time = ic_new_time if ic_burst_penalty != None: interface.ic_burst_penalty = ic_burst_penalty if ic_held_release_interval != None: interface.ic_held_release_interval = ic_held_release_interval @@ -1069,6 +1101,18 @@ class Reticulum: def _default_ic_burst_freq(self): return self.__ic_burst_freq or RNS.Interfaces.Interface.Interface.IC_BURST_FREQ + def _default_ic_pr_burst_freq_new(self): + return self.__ic_pr_burst_freq_new or RNS.Interfaces.Interface.Interface.IC_PR_BURST_FREQ_NEW + + def _default_ic_pr_burst_freq(self): + return self.__ic_pr_burst_freq or RNS.Interfaces.Interface.Interface.IC_PR_BURST_FREQ + + def _default_ec_pr_freq(self): + return self.__ec_pr_freq or RNS.Interfaces.Interface.Interface.EC_PR_FREQ + + def _default_egress_control(self): + return self.__egress_control or RNS.Interfaces.Interface.Interface.EGRESS_CONTROL + def _default_ic_new_time(self): return self.__ic_new_time or RNS.Interfaces.Interface.Interface.IC_NEW_TIME @@ -1378,12 +1422,16 @@ class Reticulum: ifstats["txb"] = interface.txb ifstats["incoming_announce_frequency"] = interface.incoming_announce_frequency() ifstats["outgoing_announce_frequency"] = interface.outgoing_announce_frequency() + ifstats["incoming_pr_frequency"] = interface.incoming_pr_frequency() + ifstats["outgoing_pr_frequency"] = interface.outgoing_pr_frequency() ifstats["announce_rate_target"] = interface.announce_rate_target ifstats["announce_rate_penalty"] = interface.announce_rate_penalty ifstats["announce_rate_grace"] = interface.announce_rate_grace ifstats["held_announces"] = len(interface.held_announces) ifstats["burst_active"] = interface.ic_burst_active ifstats["burst_activated"] = interface.ic_burst_activated + ifstats["pr_burst_active"] = interface.ic_pr_burst_active + ifstats["pr_burst_activated"] = interface.ic_pr_burst_activated ifstats["status"] = interface.online ifstats["mode"] = interface.mode diff --git a/RNS/Transport.py b/RNS/Transport.py index 5b16daa9..c10085e8 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -38,6 +38,7 @@ import inspect import threading from time import sleep from threading import Lock +from collections import deque from .vendor import umsgpack as umsgpack from RNS.Interfaces.BackboneInterface import BackboneInterface @@ -124,6 +125,7 @@ class Transport: discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes discovery_pr_tags = [] # A table for keeping track of tagged path requests max_pr_tags = 32000 # Maximum amount of unique path request tags to remember + max_queued_discovery_prs = 32 # Maximum amount of queued discovery path requests interfaces_lock = Lock() destinations_lock = Lock() @@ -917,6 +919,7 @@ class Transport: Transport.prioritize_interfaces() try: for interface in Transport.interfaces: + interface.should_ingress_limit() interface.process_held_announces() if interface.phy_keepalive: interface.send_keepalive() Transport.interface_last_jobs = time.time() @@ -980,15 +983,50 @@ class Transport: RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.trace_exception(e) # TODO: Remove - for packet in outgoing: packet.send() + if outgoing: + def job(): Transport.handle_outgoing_announces(outgoing) + threading.Thread(target=job).start() - for destination_hash in path_requests: - blocked_if = path_requests[destination_hash] - if blocked_if == None: Transport.request_path(destination_hash) - else: - for interface in Transport.interfaces: - if interface != blocked_if: Transport.request_path(destination_hash, on_interface=interface) - else: pass + if path_requests: + with Transport.discovery_pr_tx_lock: + for destination_hash in path_requests: + if not destination_hash in Transport.pending_discovery_prs: + if not len(Transport.pending_discovery_prs) >= Transport.max_queued_discovery_prs: + Transport.pending_discovery_prs.append([destination_hash, path_requests[destination_hash]]) + + if len(Transport.pending_discovery_prs): + def job(): Transport.handle_disovery_path_requests() + threading.Thread(target=job).start() + + + discovery_pr_tx_throttle = 0.5 + discovery_pr_tx_lock = Lock() + discovery_pr_handle_lock = Lock() + pending_discovery_prs = deque(maxlen=max_queued_discovery_prs) + @staticmethod + def handle_disovery_path_requests(): + if Transport.discovery_pr_handle_lock.locked(): return + with Transport.discovery_pr_handle_lock: + while len(Transport.pending_discovery_prs): + time.sleep(Transport.discovery_pr_tx_throttle) + destination_hash = None + blocked_if = None + with Transport.discovery_pr_tx_lock: + entry = Transport.pending_discovery_prs.popleft() + destination_hash = entry[0] + blocked_if = entry[1] + + if destination_hash: + if blocked_if == None: Transport.request_path(destination_hash) + else: + for interface in Transport.interfaces: + if interface != blocked_if: Transport.request_path(destination_hash, on_interface=interface) + else: pass + + + @staticmethod + def handle_outgoing_announces(outgoing): + for packet in sorted(outgoing, key=lambda p: p.hops): packet.send() @staticmethod def transmit(interface, raw): @@ -1263,6 +1301,7 @@ class Transport: Transport.transmit(interface, packet.raw) if packet.packet_type == RNS.Packet.ANNOUNCE: interface.sent_announce() + if packet.destination.type == RNS.Destination.PLAIN and packet.is_outbound_pr: interface.sent_path_request() packet_sent(packet) sent = True @@ -1629,10 +1668,12 @@ class Transport: # announces, queueing rebroadcasts of these, and removal # of queued announce rebroadcasts once handed to the next node. if packet.packet_type == RNS.Packet.ANNOUNCE: - if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True): - interface.received_announce() + announce_signature_valid = RNS.Identity.validate_announce(packet, only_validate_signature=True) + if not announce_signature_valid: return + elif interface != None: interface.received_announce() + announced_destination_known = packet.destination_hash in Transport.path_table - if not packet.destination_hash in Transport.path_table: + if not announced_destination_known: # This is an unknown destination, and we'll apply # potential ingress limiting. Already known # destinations will have re-announces controlled @@ -1694,7 +1735,7 @@ class Transport: random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10] random_blobs = [] with Transport.inbound_announce_lock: - if packet.destination_hash in Transport.path_table: + if announced_destination_known: random_blobs = Transport.path_table[packet.destination_hash][IDX_PT_RANDBLOBS] # If we already have a path to the announced @@ -2747,8 +2788,10 @@ class Transport: wait_time = (tx_time / on_interface.announce_cap) on_interface.announce_allowed_at = now + wait_time + packet.is_outbound_pr = True packet.send() - Transport.path_requests[destination_hash] = time.time() + + with Transport.path_requests_lock: Transport.path_requests[destination_hash] = time.time() @staticmethod def remote_status_handler(path, data, request_id, link_id, remote_identity, requested_at): @@ -2832,6 +2875,7 @@ class Transport: unique_tag = destination_hash+tag_bytes + if packet.receiving_interface: packet.receiving_interface.received_path_request() with Transport.discovery_pr_tags_lock: if not unique_tag in Transport.discovery_pr_tags: Transport.discovery_pr_tags.append(unique_tag) @@ -2964,9 +3008,12 @@ class Transport: for interface in Transport.interfaces: if not interface == attached_interface: - # Use the previously extracted tag from this path request - # on the new path requests as well, to avoid potential loops - Transport.request_path(destination_hash, on_interface=interface, tag=tag, recursive=True) + if interface.should_egress_limit_pr(): + RNS.log(f"Not sending recursive path request on {interface} due to active egress limiting", RNS.LOG_DEBUG) if RNS.sl(RNS.LOG_DEBUG) else None + else: + # Use the previously extracted tag from this path request + # on the new path requests as well, to avoid potential loops + Transport.request_path(destination_hash, on_interface=interface, tag=tag, recursive=True) elif not is_from_local_client and len(Transport.local_client_interfaces) > 0: # Forward the path request on all local