mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-06-08 14:11:53 -07:00
Transport logic for path request ingress and egress control
This commit is contained in:
+2
-1
@@ -117,7 +117,7 @@ class Packet:
|
||||
__slots__ = "hops", "header", "header_type", "packet_type", "transport_type", "context", "context_flag", "destination"
|
||||
__slots__ += "transport_id", "data", "flags", "raw", "packed", "sent", "create_receipt", "receipt", "fromPacked", "MTU"
|
||||
__slots__ += "sent_at", "packet_hash", "ratchet_id", "attached_interface", "receiving_interface", "rssi", "snr", "q"
|
||||
__slots__ += "ciphertext", "plaintext", "destination_hash", "destination_type", "link", "map_hash"
|
||||
__slots__ += "ciphertext", "plaintext", "destination_hash", "destination_type", "link", "map_hash", "is_outbound_pr"
|
||||
|
||||
def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST,
|
||||
header_type = HEADER_1, transport_id = None, attached_interface = None, create_receipt = True, context_flag=FLAG_UNSET):
|
||||
@@ -161,6 +161,7 @@ class Packet:
|
||||
|
||||
self.attached_interface = attached_interface
|
||||
self.receiving_interface = None
|
||||
self.is_outbound_pr = False
|
||||
self.rssi = None
|
||||
self.snr = None
|
||||
self.q = None
|
||||
|
||||
@@ -269,9 +269,13 @@ class Reticulum:
|
||||
Reticulum.__ic_burst_hold = None
|
||||
Reticulum.__ic_burst_freq_new = None
|
||||
Reticulum.__ic_burst_freq = None
|
||||
Reticulum.__ic_pr_burst_freq_new = None
|
||||
Reticulum.__ic_pr_burst_freq = None
|
||||
Reticulum.__ic_new_time = None
|
||||
Reticulum.__ic_burst_penalty = None
|
||||
Reticulum.__ic_held_release_interval = None
|
||||
Reticulum.__ec_pr_freq = None
|
||||
Reticulum.__egress_control = None
|
||||
|
||||
Reticulum.panic_on_interface_error = False
|
||||
|
||||
@@ -619,6 +623,22 @@ class Reticulum:
|
||||
v = self.config["reticulum"].as_float(option)
|
||||
if v >= 0: Reticulum.__ic_burst_freq = v
|
||||
|
||||
if option == "ic_pr_burst_freq_new":
|
||||
v = self.config["reticulum"].as_float(option)
|
||||
if v >= 0: Reticulum.__ic_pr_burst_freq_new = v
|
||||
|
||||
if option == "ic_pr_burst_freq":
|
||||
v = self.config["reticulum"].as_float(option)
|
||||
if v >= 0: Reticulum.__ic_pr_burst_freq = v
|
||||
|
||||
if option == "ec_pr_freq":
|
||||
v = self.config["reticulum"].as_float(option)
|
||||
if v >= 0: Reticulum.__ec_pr_freq = v
|
||||
|
||||
if option == "egress_control":
|
||||
v = self.config["reticulum"].as_bool(option)
|
||||
if v >= 0: Reticulum.__egress_control = v
|
||||
|
||||
if option == "ic_new_time":
|
||||
v = self.config["reticulum"].as_float(option)
|
||||
if v >= 0: Reticulum.__ic_new_time = v
|
||||
@@ -719,6 +739,8 @@ class Reticulum:
|
||||
|
||||
ingress_control = True
|
||||
if "ingress_control" in c: ingress_control = c.as_bool("ingress_control")
|
||||
egress_control = None
|
||||
if "egress_control" in c: egress_control = c.as_bool("egress_control")
|
||||
ic_max_held_announces = None
|
||||
if "ic_max_held_announces" in c: ic_max_held_announces = c.as_int("ic_max_held_announces")
|
||||
ic_burst_hold = None
|
||||
@@ -727,6 +749,12 @@ class Reticulum:
|
||||
if "ic_burst_freq_new" in c: ic_burst_freq_new = c.as_float("ic_burst_freq_new")
|
||||
ic_burst_freq = None
|
||||
if "ic_burst_freq" in c: ic_burst_freq = c.as_float("ic_burst_freq")
|
||||
ic_pr_burst_freq_new = None
|
||||
if "ic_pr_burst_freq_new" in c: ic_pr_burst_freq_new = c.as_float("ic_pr_burst_freq_new")
|
||||
ic_pr_burst_freq = None
|
||||
if "ic_pr_burst_freq" in c: ic_pr_burst_freq = c.as_float("ic_pr_burst_freq")
|
||||
ec_pr_freq = None
|
||||
if "ec_pr_freq" in c: ec_pr_freq = c.as_float("ec_pr_freq")
|
||||
ic_new_time = None
|
||||
if "ic_new_time" in c: ic_new_time = c.as_float("ic_new_time")
|
||||
ic_burst_penalty = None
|
||||
@@ -852,10 +880,14 @@ class Reticulum:
|
||||
interface.announce_rate_grace = announce_rate_grace
|
||||
interface.announce_rate_penalty = announce_rate_penalty
|
||||
interface.ingress_control = ingress_control
|
||||
if egress_control != None: interface.egress_control = egress_control
|
||||
if ic_max_held_announces != None: interface.ic_max_held_announces = ic_max_held_announces
|
||||
if ic_burst_hold != None: interface.ic_burst_hold = ic_burst_hold
|
||||
if ic_burst_freq_new != None: interface.ic_burst_freq_new = ic_burst_freq_new
|
||||
if ic_burst_freq != None: interface.ic_burst_freq = ic_burst_freq
|
||||
if ic_pr_burst_freq_new != None: interface.ic_pr_burst_freq_new = ic_pr_burst_freq_new
|
||||
if ic_pr_burst_freq != None: interface.ic_pr_burst_freq = ic_pr_burst_freq
|
||||
if ec_pr_freq != None: interface.ec_pr_freq = ec_pr_freq
|
||||
if ic_new_time != None: interface.ic_new_time = ic_new_time
|
||||
if ic_burst_penalty != None: interface.ic_burst_penalty = ic_burst_penalty
|
||||
if ic_held_release_interval != None: interface.ic_held_release_interval = ic_held_release_interval
|
||||
@@ -1069,6 +1101,18 @@ class Reticulum:
|
||||
def _default_ic_burst_freq(self):
|
||||
return self.__ic_burst_freq or RNS.Interfaces.Interface.Interface.IC_BURST_FREQ
|
||||
|
||||
def _default_ic_pr_burst_freq_new(self):
|
||||
return self.__ic_pr_burst_freq_new or RNS.Interfaces.Interface.Interface.IC_PR_BURST_FREQ_NEW
|
||||
|
||||
def _default_ic_pr_burst_freq(self):
|
||||
return self.__ic_pr_burst_freq or RNS.Interfaces.Interface.Interface.IC_PR_BURST_FREQ
|
||||
|
||||
def _default_ec_pr_freq(self):
|
||||
return self.__ec_pr_freq or RNS.Interfaces.Interface.Interface.EC_PR_FREQ
|
||||
|
||||
def _default_egress_control(self):
|
||||
return self.__egress_control or RNS.Interfaces.Interface.Interface.EGRESS_CONTROL
|
||||
|
||||
def _default_ic_new_time(self):
|
||||
return self.__ic_new_time or RNS.Interfaces.Interface.Interface.IC_NEW_TIME
|
||||
|
||||
@@ -1378,12 +1422,16 @@ class Reticulum:
|
||||
ifstats["txb"] = interface.txb
|
||||
ifstats["incoming_announce_frequency"] = interface.incoming_announce_frequency()
|
||||
ifstats["outgoing_announce_frequency"] = interface.outgoing_announce_frequency()
|
||||
ifstats["incoming_pr_frequency"] = interface.incoming_pr_frequency()
|
||||
ifstats["outgoing_pr_frequency"] = interface.outgoing_pr_frequency()
|
||||
ifstats["announce_rate_target"] = interface.announce_rate_target
|
||||
ifstats["announce_rate_penalty"] = interface.announce_rate_penalty
|
||||
ifstats["announce_rate_grace"] = interface.announce_rate_grace
|
||||
ifstats["held_announces"] = len(interface.held_announces)
|
||||
ifstats["burst_active"] = interface.ic_burst_active
|
||||
ifstats["burst_activated"] = interface.ic_burst_activated
|
||||
ifstats["pr_burst_active"] = interface.ic_pr_burst_active
|
||||
ifstats["pr_burst_activated"] = interface.ic_pr_burst_activated
|
||||
ifstats["status"] = interface.online
|
||||
ifstats["mode"] = interface.mode
|
||||
|
||||
|
||||
+63
-16
@@ -38,6 +38,7 @@ import inspect
|
||||
import threading
|
||||
from time import sleep
|
||||
from threading import Lock
|
||||
from collections import deque
|
||||
from .vendor import umsgpack as umsgpack
|
||||
from RNS.Interfaces.BackboneInterface import BackboneInterface
|
||||
|
||||
@@ -124,6 +125,7 @@ class Transport:
|
||||
discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes
|
||||
discovery_pr_tags = [] # A table for keeping track of tagged path requests
|
||||
max_pr_tags = 32000 # Maximum amount of unique path request tags to remember
|
||||
max_queued_discovery_prs = 32 # Maximum amount of queued discovery path requests
|
||||
|
||||
interfaces_lock = Lock()
|
||||
destinations_lock = Lock()
|
||||
@@ -917,6 +919,7 @@ class Transport:
|
||||
Transport.prioritize_interfaces()
|
||||
try:
|
||||
for interface in Transport.interfaces:
|
||||
interface.should_ingress_limit()
|
||||
interface.process_held_announces()
|
||||
if interface.phy_keepalive: interface.send_keepalive()
|
||||
Transport.interface_last_jobs = time.time()
|
||||
@@ -980,15 +983,50 @@ class Transport:
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e) # TODO: Remove
|
||||
|
||||
for packet in outgoing: packet.send()
|
||||
if outgoing:
|
||||
def job(): Transport.handle_outgoing_announces(outgoing)
|
||||
threading.Thread(target=job).start()
|
||||
|
||||
for destination_hash in path_requests:
|
||||
blocked_if = path_requests[destination_hash]
|
||||
if blocked_if == None: Transport.request_path(destination_hash)
|
||||
else:
|
||||
for interface in Transport.interfaces:
|
||||
if interface != blocked_if: Transport.request_path(destination_hash, on_interface=interface)
|
||||
else: pass
|
||||
if path_requests:
|
||||
with Transport.discovery_pr_tx_lock:
|
||||
for destination_hash in path_requests:
|
||||
if not destination_hash in Transport.pending_discovery_prs:
|
||||
if not len(Transport.pending_discovery_prs) >= Transport.max_queued_discovery_prs:
|
||||
Transport.pending_discovery_prs.append([destination_hash, path_requests[destination_hash]])
|
||||
|
||||
if len(Transport.pending_discovery_prs):
|
||||
def job(): Transport.handle_disovery_path_requests()
|
||||
threading.Thread(target=job).start()
|
||||
|
||||
|
||||
discovery_pr_tx_throttle = 0.5
|
||||
discovery_pr_tx_lock = Lock()
|
||||
discovery_pr_handle_lock = Lock()
|
||||
pending_discovery_prs = deque(maxlen=max_queued_discovery_prs)
|
||||
@staticmethod
|
||||
def handle_disovery_path_requests():
|
||||
if Transport.discovery_pr_handle_lock.locked(): return
|
||||
with Transport.discovery_pr_handle_lock:
|
||||
while len(Transport.pending_discovery_prs):
|
||||
time.sleep(Transport.discovery_pr_tx_throttle)
|
||||
destination_hash = None
|
||||
blocked_if = None
|
||||
with Transport.discovery_pr_tx_lock:
|
||||
entry = Transport.pending_discovery_prs.popleft()
|
||||
destination_hash = entry[0]
|
||||
blocked_if = entry[1]
|
||||
|
||||
if destination_hash:
|
||||
if blocked_if == None: Transport.request_path(destination_hash)
|
||||
else:
|
||||
for interface in Transport.interfaces:
|
||||
if interface != blocked_if: Transport.request_path(destination_hash, on_interface=interface)
|
||||
else: pass
|
||||
|
||||
|
||||
@staticmethod
|
||||
def handle_outgoing_announces(outgoing):
|
||||
for packet in sorted(outgoing, key=lambda p: p.hops): packet.send()
|
||||
|
||||
@staticmethod
|
||||
def transmit(interface, raw):
|
||||
@@ -1263,6 +1301,7 @@ class Transport:
|
||||
|
||||
Transport.transmit(interface, packet.raw)
|
||||
if packet.packet_type == RNS.Packet.ANNOUNCE: interface.sent_announce()
|
||||
if packet.destination.type == RNS.Destination.PLAIN and packet.is_outbound_pr: interface.sent_path_request()
|
||||
packet_sent(packet)
|
||||
sent = True
|
||||
|
||||
@@ -1629,10 +1668,12 @@ class Transport:
|
||||
# announces, queueing rebroadcasts of these, and removal
|
||||
# of queued announce rebroadcasts once handed to the next node.
|
||||
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
||||
if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True):
|
||||
interface.received_announce()
|
||||
announce_signature_valid = RNS.Identity.validate_announce(packet, only_validate_signature=True)
|
||||
if not announce_signature_valid: return
|
||||
elif interface != None: interface.received_announce()
|
||||
announced_destination_known = packet.destination_hash in Transport.path_table
|
||||
|
||||
if not packet.destination_hash in Transport.path_table:
|
||||
if not announced_destination_known:
|
||||
# This is an unknown destination, and we'll apply
|
||||
# potential ingress limiting. Already known
|
||||
# destinations will have re-announces controlled
|
||||
@@ -1694,7 +1735,7 @@ class Transport:
|
||||
random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10]
|
||||
random_blobs = []
|
||||
with Transport.inbound_announce_lock:
|
||||
if packet.destination_hash in Transport.path_table:
|
||||
if announced_destination_known:
|
||||
random_blobs = Transport.path_table[packet.destination_hash][IDX_PT_RANDBLOBS]
|
||||
|
||||
# If we already have a path to the announced
|
||||
@@ -2747,8 +2788,10 @@ class Transport:
|
||||
wait_time = (tx_time / on_interface.announce_cap)
|
||||
on_interface.announce_allowed_at = now + wait_time
|
||||
|
||||
packet.is_outbound_pr = True
|
||||
packet.send()
|
||||
Transport.path_requests[destination_hash] = time.time()
|
||||
|
||||
with Transport.path_requests_lock: Transport.path_requests[destination_hash] = time.time()
|
||||
|
||||
@staticmethod
|
||||
def remote_status_handler(path, data, request_id, link_id, remote_identity, requested_at):
|
||||
@@ -2832,6 +2875,7 @@ class Transport:
|
||||
|
||||
unique_tag = destination_hash+tag_bytes
|
||||
|
||||
if packet.receiving_interface: packet.receiving_interface.received_path_request()
|
||||
with Transport.discovery_pr_tags_lock:
|
||||
if not unique_tag in Transport.discovery_pr_tags:
|
||||
Transport.discovery_pr_tags.append(unique_tag)
|
||||
@@ -2964,9 +3008,12 @@ class Transport:
|
||||
|
||||
for interface in Transport.interfaces:
|
||||
if not interface == attached_interface:
|
||||
# Use the previously extracted tag from this path request
|
||||
# on the new path requests as well, to avoid potential loops
|
||||
Transport.request_path(destination_hash, on_interface=interface, tag=tag, recursive=True)
|
||||
if interface.should_egress_limit_pr():
|
||||
RNS.log(f"Not sending recursive path request on {interface} due to active egress limiting", RNS.LOG_DEBUG) if RNS.sl(RNS.LOG_DEBUG) else None
|
||||
else:
|
||||
# Use the previously extracted tag from this path request
|
||||
# on the new path requests as well, to avoid potential loops
|
||||
Transport.request_path(destination_hash, on_interface=interface, tag=tag, recursive=True)
|
||||
|
||||
elif not is_from_local_client and len(Transport.local_client_interfaces) > 0:
|
||||
# Forward the path request on all local
|
||||
|
||||
Reference in New Issue
Block a user