diff --git a/RNS/Interfaces/BackboneInterface.py b/RNS/Interfaces/BackboneInterface.py index 39e436bd..14cab008 100644 --- a/RNS/Interfaces/BackboneInterface.py +++ b/RNS/Interfaces/BackboneInterface.py @@ -156,6 +156,32 @@ class BackboneInterface(Interface): else: raise SystemError("Insufficient parameters to create listener") + + __last_ic_burst_check = 0 + __last_ic_burst_state = False + @property + def ic_burst_active(self): + if time.time() > self.__last_ic_burst_check + 2: + self.__last_ic_burst_state = any(i.ic_burst_active for i in self.spawned_interfaces) + + return self.__last_ic_burst_state + + @ic_burst_active.setter + def ic_burst_active(self, value): pass + + __ic_burst_activated_check = 0 + __ic_burst_activated = 0 + @property + def ic_burst_activated(self): + if time.time() > self.__ic_burst_activated_check + 2: + activated = [i.ic_burst_activated for i in self.spawned_interfaces if i.ic_burst_active] + if activated: self.__ic_burst_activated = min(activated) + + return self.__ic_burst_activated + + @ic_burst_activated.setter + def ic_burst_activated(self, value): pass + @staticmethod def start(): if not BackboneInterface._job_active: threading.Thread(target=BackboneInterface.__job, daemon=True).start() @@ -206,7 +232,7 @@ class BackboneInterface(Interface): @staticmethod def deregister_fileno(fileno): if fileno < 0: - RNS.log(f"Attempt to deregister invalid file descriptor {fileno}", RNS.LOG_ERROR) + RNS.log(f"Attempt to deregister invalid file descriptor {fileno}", RNS.LOG_WARNING) return try: BackboneInterface.epoll.unregister(fileno) @@ -320,11 +346,17 @@ class BackboneInterface(Interface): elif fileno in BackboneInterface.listener_filenos: owner_interface, server_socket = BackboneInterface.listener_filenos[fileno] if fileno == server_socket.fileno() and (event & select.EPOLLIN): - client_socket, address = server_socket.accept() - client_socket.setblocking(0) - if not owner_interface.incoming_connection(client_socket): + try: + client_socket, address = server_socket.accept() + client_socket.setblocking(0) + if not owner_interface.incoming_connection(client_socket): + try: client_socket.close() + except Exception as e: RNS.log(f"Error while closing socket for failed incoming connection: {e}", RNS.LOG_ERROR) + + except: + RNS.log(f"Accepting socket failed for incoming connection: {e}", RNS.LOG_WARNING) try: client_socket.close() - except Exception as e: RNS.log(f"Error while closing socket for failed incoming connection: {e}", RNS.LOG_ERROR) + except Exception as e: RNS.log(f"Error while closing socket for failed incoming socket accept: {e}", RNS.LOG_WARNING) elif fileno == server_socket.fileno() and (event & select.EPOLLHUP): try: BackboneInterface.deregister_fileno(fileno) @@ -408,6 +440,12 @@ class BackboneInterface(Interface): def sent_announce(self, from_spawned=False): if from_spawned: self.oa_freq_deque.append(time.time()) + def received_path_request(self, from_spawned=False): + if from_spawned: self.ip_freq_deque.append(time.time()) + + def sent_path_request(self, from_spawned=False): + if from_spawned: self.op_freq_deque.append(time.time()) + def process_outgoing(self, data): pass diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py index 0d262b6d..88acfa35 100644 --- a/RNS/Interfaces/I2PInterface.py +++ b/RNS/Interfaces/I2PInterface.py @@ -1003,6 +1003,12 @@ class I2PInterface(Interface): def sent_announce(self, from_spawned=False): if from_spawned: self.oa_freq_deque.append(time.time()) + def received_path_request(self, from_spawned=False): + if from_spawned: self.ip_freq_deque.append(time.time()) + + def sent_path_request(self, from_spawned=False): + if from_spawned: self.op_freq_deque.append(time.time()) + def detach(self): RNS.log("Detaching "+str(self), RNS.LOG_DEBUG) self.i2p.stop() diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index ed2648c1..8ae875cb 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -55,8 +55,15 @@ class Interface: # How many samples to use for announce # frequency calculations - IA_FREQ_SAMPLES = 128 - OA_FREQ_SAMPLES = 128 + IA_FREQ_SAMPLES = 48 + OA_FREQ_SAMPLES = 48 + IP_FREQ_SAMPLES = 48 + OP_FREQ_SAMPLES = 48 + + AR_MINFREQ_HZ = 0.1 + PR_MINFREQ_HZ = 0.1 + AR_FREQ_DECAY = 1/AR_MINFREQ_HZ + PR_FREQ_DECAY = 1/PR_MINFREQ_HZ # Maximum amount of ingress limited announces # to hold at any given time. @@ -68,10 +75,15 @@ class Interface: IC_NEW_TIME = 2*60*60 IC_BURST_FREQ_NEW = 3 IC_BURST_FREQ = 10 + IC_PR_BURST_FREQ_NEW = 3 + IC_PR_BURST_FREQ = 10 IC_BURST_HOLD = 15 IC_BURST_PENALTY = 15 IC_HELD_RELEASE_INTERVAL = 5 - IC_DEQUE_MIN_SAMPLE = 32 + IC_DEQUE_MIN_SAMPLE = 2 + IC_BURST_MIN_SAMPLES = 8 + EC_PR_FREQ = 5 + EGRESS_CONTROL = False # Default announce rate targets DEFAULT_AR_TARGET = 3600 @@ -102,18 +114,26 @@ class Interface: self.ic_burst_active = False self.ic_burst_activated = 0 + self.ic_pr_burst_active = False + self.ic_pr_burst_activated = 0 self.ic_held_release = 0 self.ic_max_held_announces = RNS.Reticulum.get_instance()._default_ic_max_held_announces() self.ic_burst_hold = RNS.Reticulum.get_instance()._default_ic_burst_hold() self.ic_burst_freq_new = RNS.Reticulum.get_instance()._default_ic_burst_freq_new() self.ic_burst_freq = RNS.Reticulum.get_instance()._default_ic_burst_freq() + self.ic_pr_burst_freq_new = RNS.Reticulum.get_instance()._default_ic_pr_burst_freq_new() + self.ic_pr_burst_freq = RNS.Reticulum.get_instance()._default_ic_pr_burst_freq() self.ic_new_time = RNS.Reticulum.get_instance()._default_ic_new_time() self.ic_burst_penalty = RNS.Reticulum.get_instance()._default_ic_burst_penalty() self.ic_held_release_interval = RNS.Reticulum.get_instance()._default_ic_held_release_interval() + self.ec_pr_freq = RNS.Reticulum.get_instance()._default_ec_pr_freq() + self.egress_control = RNS.Reticulum.get_instance()._default_egress_control() self.held_announces = {} self.ia_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES) self.oa_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES) + self.ip_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES) + self.op_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES) def get_hash(self): return RNS.Identity.full_hash(str(self).encode("utf-8")) @@ -129,7 +149,7 @@ class Interface: if self.ic_burst_active: if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold: - self.ic_burst_active = False + if len(self.ia_freq_deque) >= self.IC_BURST_MIN_SAMPLES: self.ic_burst_active = False return True @@ -144,6 +164,37 @@ class Interface: else: return False + def should_ingress_limit_pr(self): + if self.ingress_control: + freq_threshold = self.ic_pr_burst_freq_new if self.age() < self.ic_new_time else self.ic_pr_burst_freq + ip_freq = self.incoming_pr_frequency() + + if self.ic_pr_burst_active: + if ip_freq < freq_threshold and time.time() > self.ic_pr_burst_activated+self.ic_burst_hold: + self.ic_pr_burst_active = False + + return True + + else: + if ip_freq > freq_threshold: + self.ic_pr_burst_active = True + self.ic_pr_burst_activated = time.time() + return True + + else: return False + + else: return False + + def should_egress_limit_pr(self): + if self.egress_control: + freq_threshold = self.ec_pr_freq + op_freq = self.outgoing_pr_frequency() + + if op_freq > freq_threshold: + if len(self.op_freq_deque) >= self.IC_BURST_MIN_SAMPLES: return True + + return False + def optimise_mtu(self): if self.AUTOCONFIGURE_MTU: if self.bitrate >= 1_000_000_000: @@ -169,7 +220,7 @@ class Interface: else: self.HW_MTU = None - RNS.log(f"{self} hardware MTU set to {self.HW_MTU}", RNS.LOG_DEBUG) # TODO: Remove debug + RNS.log(f"{self} hardware MTU set to {self.HW_MTU}", RNS.LOG_DEBUG) def age(self): return time.time()-self.created @@ -215,12 +266,23 @@ class Interface: if hasattr(self, "parent_interface") and self.parent_interface != None: self.parent_interface.sent_announce(from_spawned=True) + def received_path_request(self, from_spawned=False): + self.ip_freq_deque.append(time.time()) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.received_path_request(from_spawned=True) + + def sent_path_request(self, from_spawned=False): + self.op_freq_deque.append(time.time()) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.sent_path_request(from_spawned=True) + def incoming_announce_frequency(self): n = len(self.ia_freq_deque) if not n > self.IC_DEQUE_MIN_SAMPLE: return 0 else: oldest = self.ia_freq_deque[0] span = time.time() - oldest + if span > self.AR_FREQ_DECAY: self.ia_freq_deque.popleft() if span <= 0: return 0 hz = n / span return hz @@ -231,6 +293,29 @@ class Interface: else: oldest = self.oa_freq_deque[0] span = time.time() - oldest + if span > self.AR_FREQ_DECAY: self.oa_freq_deque.popleft() + if span <= 0: return 0 + hz = n / span + return hz + + def incoming_pr_frequency(self): + n = len(self.ip_freq_deque) + if not n > self.IC_DEQUE_MIN_SAMPLE: return 0 + else: + oldest = self.ip_freq_deque[0] + span = time.time() - oldest + if span > self.PR_FREQ_DECAY: self.ip_freq_deque.popleft() + if span <= 0: return 0 + hz = n / span + return hz + + def outgoing_pr_frequency(self): + n = len(self.op_freq_deque) + if not len(self.op_freq_deque) > 1: return 0 + else: + oldest = self.op_freq_deque[0] + span = time.time() - oldest + if span > self.PR_FREQ_DECAY: self.op_freq_deque.popleft() if span <= 0: return 0 hz = n / span return hz diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index 4fd8c62e..e1ae6c53 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -488,6 +488,12 @@ class LocalServerInterface(Interface): def sent_announce(self, from_spawned=False): if from_spawned: self.oa_freq_deque.append(time.time()) + def received_path_request(self, from_spawned=False): + if from_spawned: self.ip_freq_deque.append(time.time()) + + def sent_path_request(self, from_spawned=False): + if from_spawned: self.op_freq_deque.append(time.time()) + def __str__(self): if self.socket_path: return "Shared Instance["+str(self.socket_path.replace("\0", ""))+"]" else: return "Shared Instance["+str(self.bind_port)+"]" diff --git a/RNS/Interfaces/RNodeMultiInterface.py b/RNS/Interfaces/RNodeMultiInterface.py index bf9d0dd6..c6416279 100644 --- a/RNS/Interfaces/RNodeMultiInterface.py +++ b/RNS/Interfaces/RNodeMultiInterface.py @@ -549,6 +549,12 @@ class RNodeMultiInterface(Interface): def sent_announce(self, from_spawned=False): if from_spawned: self.oa_freq_deque.append(time.time()) + def received_path_request(self, from_spawned=False): + if from_spawned: self.ip_freq_deque.append(time.time()) + + def sent_path_request(self, from_spawned=False): + if from_spawned: self.op_freq_deque.append(time.time()) + def readLoop(self): try: in_frame = False diff --git a/RNS/Interfaces/TCPInterface.py b/RNS/Interfaces/TCPInterface.py index b521a46c..59698fde 100644 --- a/RNS/Interfaces/TCPInterface.py +++ b/RNS/Interfaces/TCPInterface.py @@ -634,6 +634,12 @@ class TCPServerInterface(Interface): def sent_announce(self, from_spawned=False): if from_spawned: self.oa_freq_deque.append(time.time()) + def received_path_request(self, from_spawned=False): + if from_spawned: self.ip_freq_deque.append(time.time()) + + def sent_path_request(self, from_spawned=False): + if from_spawned: self.op_freq_deque.append(time.time()) + def process_outgoing(self, data): pass