Refactoring work for free-threaded transport I/O

This commit is contained in:
Mark Qvist
2026-04-12 14:55:42 +02:00
parent fa353fb0b3
commit 23c0a493b1
4 changed files with 208 additions and 228 deletions

View File

@@ -421,8 +421,7 @@ class Destination:
else:
if packet.packet_type == RNS.Packet.DATA:
if self.callbacks.packet != None:
try:
self.callbacks.packet(plaintext, packet)
try: self.callbacks.packet(plaintext, packet)
except Exception as e:
RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)

View File

@@ -119,8 +119,11 @@ class Transport:
discovery_pr_tags = [] # A table for keeping track of tagged path requests
max_pr_tags = 32000 # Maximum amount of unique path request tags to remember
destinations_lock = Lock()
inbound_announce_lock = Lock()
announce_table_lock = Lock()
announce_rate_table_lock = Lock()
announce_handler_lock = Lock()
path_table_lock = Lock()
reverse_table_lock = Lock()
link_table_lock = Lock()
@@ -156,8 +159,6 @@ class Transport:
pending_local_path_requests = {}
start_time = None
jobs_locked = False
jobs_running = False
hashlist_maxsize = 1000000
job_interval = 0.250
links_last_checked = 0.0
@@ -193,7 +194,6 @@ class Transport:
@staticmethod
def start(reticulum_instance):
Transport.jobs_running = True
Transport.owner = reticulum_instance
if Transport.identity == None:
@@ -262,7 +262,6 @@ class Transport:
Transport.last_mgmt_announce = time.time() - Transport.mgmt_announce_interval + 15
# Start job loops
Transport.jobs_running = False
threading.Thread(target=Transport.jobloop, daemon=True).start()
threading.Thread(target=Transport.count_traffic_loop, daemon=True).start()
@@ -473,7 +472,6 @@ class Transport:
outgoing = []
path_requests = {}
blocked_if = None
Transport.jobs_running = True
try:
with Transport.jobs_lock:
@@ -912,8 +910,6 @@ class Transport:
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
Transport.jobs_running = False
for packet in outgoing: packet.send()
for destination_hash in path_requests:
@@ -966,10 +962,6 @@ class Transport:
@staticmethod
def outbound(packet):
while (Transport.jobs_running): sleep(0.0005)
Transport.jobs_locked = True
sent = False
outbound_time = time.time()
@@ -992,31 +984,37 @@ class Transport:
if generate_receipt:
packet.receipt = RNS.PacketReceipt(packet)
Transport.receipts.append(packet.receipt)
with Transport.receipts_lock: Transport.receipts.append(packet.receipt)
# TODO: Enable when caching has been redesigned
# Transport.cache(packet)
# Check if we have a known path for the destination in the path table
if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.path_table:
outbound_interface = Transport.path_table[packet.destination_hash][IDX_PT_RVCD_IF]
with Transport.path_table_lock:
if not packet.destination_hash in Transport.path_table:
RNS.log(f"Dropped packet since path table entry disappeared during outbound processing", RNS.LOG_WARNING)
return False
else: path_entry = Transport.path_table[packet.destination_hash]
outbound_interface = path_entry[IDX_PT_RVCD_IF]
# If there's more than one hop to the destination, and we know
# a path, we insert the packet into transport by adding the next
# transport nodes address to the header, and modifying the flags.
# This rule applies both for "normal" transport, and when connected
# to a local shared Reticulum instance.
if Transport.path_table[packet.destination_hash][IDX_PT_HOPS] > 1:
if path_entry[IDX_PT_HOPS] > 1:
if packet.header_type == RNS.Packet.HEADER_1:
# Insert packet into transport
new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111)
new_raw = struct.pack("!B", new_flags)
new_raw += packet.raw[1:2]
new_raw += Transport.path_table[packet.destination_hash][IDX_PT_NEXT_HOP]
new_raw += path_entry[IDX_PT_NEXT_HOP]
new_raw += packet.raw[2:]
packet_sent(packet)
Transport.transmit(outbound_interface, new_raw)
Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time()
path_entry[IDX_PT_TIMESTAMP] = time.time()
sent = True
# In the special case where we are connected to a local shared
@@ -1026,17 +1024,17 @@ class Transport:
# one hop away would just be broadcast directly, but since we
# are "behind" a shared instance, we need to get that instance
# to transport it onto the network.
elif Transport.path_table[packet.destination_hash][IDX_PT_HOPS] == 1 and Transport.owner.is_connected_to_shared_instance:
elif path_entry[IDX_PT_HOPS] == 1 and Transport.owner.is_connected_to_shared_instance:
if packet.header_type == RNS.Packet.HEADER_1:
# Insert packet into transport
new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111)
new_raw = struct.pack("!B", new_flags)
new_raw += packet.raw[1:2]
new_raw += Transport.path_table[packet.destination_hash][IDX_PT_NEXT_HOP]
new_raw += path_entry[IDX_PT_NEXT_HOP]
new_raw += packet.raw[2:]
packet_sent(packet)
Transport.transmit(outbound_interface, new_raw)
Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time()
path_entry[IDX_PT_TIMESTAMP] = time.time()
sent = True
# If none of the above applies, we know the destination is
@@ -1058,10 +1056,8 @@ class Transport:
should_transmit = True
if packet.destination.type == RNS.Destination.LINK:
if packet.destination.status == RNS.Link.CLOSED:
should_transmit = False
if interface != packet.destination.attached_interface:
should_transmit = False
if packet.destination.status == RNS.Link.CLOSED: should_transmit = False
if interface != packet.destination.attached_interface: should_transmit = False
if packet.attached_interface != None and interface != packet.attached_interface:
should_transmit = False
@@ -1073,7 +1069,9 @@ class Transport:
should_transmit = False
elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
if local_destination != None:
# RNS.log("Allowing announce broadcast on roaming-mode interface from instance-local destination", RNS.LOG_EXTREME)
pass
@@ -1094,7 +1092,8 @@ class Transport:
should_transmit = False
elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
if local_destination != None:
# RNS.log("Allowing announce broadcast on boundary-mode interface from instance-local destination", RNS.LOG_EXTREME)
pass
@@ -1117,14 +1116,9 @@ class Transport:
# TODO: Rethink whether this is actually optimal.
if packet.hops > 0:
if not hasattr(interface, "announce_cap"):
interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
if not hasattr(interface, "announce_allowed_at"):
interface.announce_allowed_at = 0
if not hasattr(interface, "announce_queue"):
interface.announce_queue = []
if not hasattr(interface, "announce_cap"): interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
if not hasattr(interface, "announce_allowed_at"): interface.announce_allowed_at = 0
if not hasattr(interface, "announce_queue"): interface.announce_queue = []
queued_announces = True if len(interface.announce_queue) > 0 else False
if not queued_announces and outbound_time > interface.announce_allowed_at and interface.bitrate != None and interface.bitrate != 0:
@@ -1154,13 +1148,11 @@ class Transport:
e["raw"] = packet.raw
if should_queue:
entry = {
"destination": packet.destination_hash,
"time": outbound_time,
"hops": packet.hops,
"emitted": Transport.announce_emitted(packet),
"raw": packet.raw
}
entry = { "destination": packet.destination_hash,
"time": outbound_time,
"hops": packet.hops,
"emitted": Transport.announce_emitted(packet),
"raw": packet.raw }
queued_announces = True if len(interface.announce_queue) > 0 else False
interface.announce_queue.append(entry)
@@ -1170,10 +1162,8 @@ class Transport:
timer = threading.Timer(wait_time, interface.process_announce_queue)
timer.start()
if wait_time < 1:
wait_time_str = str(round(wait_time*1000,2))+"ms"
else:
wait_time_str = str(round(wait_time*1,2))+"s"
if wait_time < 1: wait_time_str = str(round(wait_time*1000,2))+"ms"
else: wait_time_str = str(round(wait_time*1,2))+"s"
ql_str = str(len(interface.announce_queue))
RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME)
@@ -1181,10 +1171,8 @@ class Transport:
else:
wait_time = max(interface.announce_allowed_at - time.time(), 0)
if wait_time < 1:
wait_time_str = str(round(wait_time*1000,2))+"ms"
else:
wait_time_str = str(round(wait_time*1,2))+"s"
if wait_time < 1: wait_time_str = str(round(wait_time*1000,2))+"ms"
else: wait_time_str = str(round(wait_time*1,2))+"s"
ql_str = str(len(interface.announce_queue))
RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME)
@@ -1199,12 +1187,10 @@ class Transport:
stored_hash = True
Transport.transmit(interface, packet.raw)
if packet.packet_type == RNS.Packet.ANNOUNCE:
interface.sent_announce()
if packet.packet_type == RNS.Packet.ANNOUNCE: interface.sent_announce()
packet_sent(packet)
sent = True
Transport.jobs_locked = False
return sent
@staticmethod
@@ -1253,8 +1239,7 @@ class Transport:
RNS.log("Dropped invalid GROUP announce packet", RNS.LOG_DEBUG)
return False
if not packet.packet_hash in Transport.packet_hashlist and not packet.packet_hash in Transport.packet_hashlist_prev:
return True
if not packet.packet_hash in Transport.packet_hashlist and not packet.packet_hash in Transport.packet_hashlist_prev: return True
else:
if packet.packet_type == RNS.Packet.ANNOUNCE:
if packet.destination_type == RNS.Destination.SINGLE:
@@ -1321,16 +1306,10 @@ class Transport:
else: return
while (Transport.jobs_running): sleep(0.0005)
if Transport.identity == None: return
Transport.jobs_locked = True
packet = RNS.Packet(None, raw)
if not packet.unpack():
Transport.jobs_locked = False
return
if not packet.unpack(): return
packet.receiving_interface = interface
packet.hops += 1
@@ -1430,9 +1409,7 @@ class Transport:
# it, do so and stop processing. Otherwise resume
# normal processing.
if packet.context == RNS.Packet.CACHE_REQUEST:
if Transport.cache_request_packet(packet):
Transport.jobs_locked = False
return
if Transport.cache_request_packet(packet): return
# If the packet is in transport, check whether we
# are the designated next hop, and process it
@@ -1503,7 +1480,7 @@ class Transport:
False, # 7: Validated
proof_timeout ] # 8: Proof timeout timestamp
Transport.link_table[RNS.Link.link_id_from_lr_packet(packet)] = link_entry
with Transport.link_table_lock: Transport.link_table[RNS.Link.link_id_from_lr_packet(packet)] = link_entry
else:
# Entry format is
@@ -1511,10 +1488,10 @@ class Transport:
outbound_interface, # 1: Outbound interface
time.time() ] # 2: Timestamp
Transport.reverse_table[packet.getTruncatedHash()] = reverse_entry
with Transport.reverse_table_lock: Transport.reverse_table[packet.getTruncatedHash()] = reverse_entry
Transport.transmit(outbound_interface, new_raw)
Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time()
with Transport.path_table_lock: Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time()
else:
# TODO: There should probably be some kind of REJECT
@@ -1561,8 +1538,7 @@ class Transport:
Transport.transmit(outbound_interface, new_raw)
Transport.link_table[packet.destination_hash][IDX_LT_TIMESTAMP] = time.time()
# TODO: Test and possibly enable this at some point
# Transport.jobs_locked = False
# TODO: Can we return safely here? Test and possibly enable this at some point.
# return
@@ -1580,10 +1556,10 @@ class Transport:
# by normal announce rate limiting.
if interface.should_ingress_limit():
interface.hold_announce(packet)
Transport.jobs_locked = False
return
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
if local_destination == None and RNS.Identity.validate_announce(packet):
if packet.transport_id != None:
received_from = packet.transport_id
@@ -1618,7 +1594,8 @@ class Transport:
# First, check that the announce is not for a destination
# local to this system, and that hops are less than the max
if (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1):
with Transport.destinations_lock: local_and_hops_condition = (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1)
if local_and_hops_condition:
announce_emitted = Transport.announce_emitted(packet)
random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10]
@@ -1700,36 +1677,38 @@ class Transport:
if should_add:
now = time.time()
is_from_local_client = Transport.from_local_client(packet)
rate_blocked = False
if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None:
if not packet.destination_hash in Transport.announce_rate_table:
rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]}
Transport.announce_rate_table[packet.destination_hash] = rate_entry
else:
rate_entry = Transport.announce_rate_table[packet.destination_hash]
rate_entry["timestamps"].append(now)
while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS:
rate_entry["timestamps"].pop(0)
current_rate = now - rate_entry["last"]
if now > rate_entry["blocked_until"]:
if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1
else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1)
if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace:
rate_target = packet.receiving_interface.announce_rate_target
rate_penalty = packet.receiving_interface.announce_rate_penalty
rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty
rate_blocked = True
else:
rate_entry["last"] = now
with Transport.announce_rate_table_lock:
if not packet.destination_hash in Transport.announce_rate_table:
rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]}
Transport.announce_rate_table[packet.destination_hash] = rate_entry
else:
rate_blocked = True
rate_entry = Transport.announce_rate_table[packet.destination_hash]
rate_entry["timestamps"].append(now)
while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS:
rate_entry["timestamps"].pop(0)
current_rate = now - rate_entry["last"]
if now > rate_entry["blocked_until"]:
if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1
else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1)
if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace:
rate_target = packet.receiving_interface.announce_rate_target
rate_penalty = packet.receiving_interface.announce_rate_penalty
rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty
rate_blocked = True
else:
rate_entry["last"] = now
else:
rate_blocked = True
retries = 0
@@ -1741,43 +1720,41 @@ class Transport:
retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW)
if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
expires = now + Transport.AP_PATH_TIME
expires = now + Transport.AP_PATH_TIME
elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
expires = now + Transport.ROAMING_PATH_TIME
expires = now + Transport.ROAMING_PATH_TIME
else:
expires = now + Transport.PATHFINDER_E
expires = now + Transport.PATHFINDER_E
if not random_blob in random_blobs:
random_blobs.append(random_blob)
random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:]
if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE:
if (RNS.Reticulum.transport_enabled() or is_from_local_client) and packet.context != RNS.Packet.PATH_RESPONSE:
# Insert announce into announce table for retransmission
if rate_blocked:
RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG)
if rate_blocked: RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG)
else:
if Transport.from_local_client(packet):
if is_from_local_client:
# If the announce is from a local client,
# it is announced immediately, but only one time.
retransmit_timeout = now
retries = Transport.PATHFINDER_R
Transport.announce_table[packet.destination_hash] = [
now, # 0: IDX_AT_TIMESTAMP
retransmit_timeout, # 1: IDX_AT_RTRNS_TMO
retries, # 2: IDX_AT_RETRIES
received_from, # 3: IDX_AT_RCVD_IF
announce_hops, # 4: IDX_AT_HOPS
packet, # 5: IDX_AT_PACKET
local_rebroadcasts, # 6: IDX_AT_LCL_RBRD
block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD
attached_interface, # 8: IDX_AT_ATTCHD_IF
]
with Transport.announce_table_lock:
Transport.announce_table[packet.destination_hash] = [
now, # 0: IDX_AT_TIMESTAMP
retransmit_timeout, # 1: IDX_AT_RTRNS_TMO
retries, # 2: IDX_AT_RETRIES
received_from, # 3: IDX_AT_RCVD_IF
announce_hops, # 4: IDX_AT_HOPS
packet, # 5: IDX_AT_PACKET
local_rebroadcasts, # 6: IDX_AT_LCL_RBRD
block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD
attached_interface, # 8: IDX_AT_ATTCHD_IF
]
# TODO: Check from_local_client once and store result
elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
elif is_from_local_client and packet.context == RNS.Packet.PATH_RESPONSE:
# If this is a path response from a local client,
# check if any external interfaces have pending
# path requests.
@@ -1787,17 +1764,18 @@ class Transport:
retransmit_timeout = now
retries = Transport.PATHFINDER_R
Transport.announce_table[packet.destination_hash] = [
now,
retransmit_timeout,
retries,
received_from,
announce_hops,
packet,
local_rebroadcasts,
block_rebroadcasts,
attached_interface
]
with Transport.announce_table_lock:
Transport.announce_table[packet.destination_hash] = [
now,
retransmit_timeout,
retries,
received_from,
announce_hops,
packet,
local_rebroadcasts,
block_rebroadcasts,
attached_interface
]
# If we have any local clients connected, we re-
# transmit the announce to them immediately
@@ -1810,13 +1788,13 @@ class Transport:
announce_data = packet.data
# TODO: Shouldn't the context be PATH_RESPONSE in the first case here?
if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
if is_from_local_client and packet.context == RNS.Packet.PATH_RESPONSE:
for local_interface in Transport.local_client_interfaces:
if packet.receiving_interface != local_interface:
new_announce = RNS.Packet(
announce_destination,
announce_data,
RNS.Packet.ANNOUNCE,
RNS.Packet.ANNOUNCE, # <-- This one?
context = announce_context,
header_type = RNS.Packet.HEADER_2,
transport_type = Transport.TRANSPORT,
@@ -1880,73 +1858,75 @@ class Transport:
if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce")
path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash]
Transport.path_table[packet.destination_hash] = path_table_entry
with Transport.path_table_lock: Transport.path_table[packet.destination_hash] = path_table_entry
RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG)
# If the receiving interface is a tunnel, we add the
# announce to the tunnels table
if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None:
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
paths = tunnel_entry[IDX_TT_PATHS]
paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash]
expires = time.time() + Transport.DESTINATION_TIMEOUT
tunnel_entry[IDX_TT_EXPIRES] = expires
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
with Transport.tunnels_lock:
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
paths = tunnel_entry[IDX_TT_PATHS]
paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash]
expires = time.time() + Transport.DESTINATION_TIMEOUT
tunnel_entry[IDX_TT_EXPIRES] = expires
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
# Call externally registered callbacks from apps
# wanting to know when an announce arrives
for handler in Transport.announce_handlers:
try:
# Check that the announced destination matches
# the handlers aspect filter
execute_callback = False
announce_identity = RNS.Identity.recall(packet.destination_hash)
if handler.aspect_filter == None:
# If the handlers aspect filter is set to
# None, we execute the callback in all cases
execute_callback = True
else:
handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity)
if packet.destination_hash == handler_expected_hash: execute_callback = True
# If this is a path response, check whether the
# handler wants to receive it.
if packet.context == RNS.Packet.PATH_RESPONSE:
if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: pass
else: execute_callback = False
if execute_callback:
if len(inspect.signature(handler.received_announce).parameters) == 3:
def job():
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash))
threading.Thread(target=job, daemon=True).start()
elif len(inspect.signature(handler.received_announce).parameters) == 4:
def job():
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
announce_packet_hash = packet.packet_hash)
threading.Thread(target=job, daemon=True).start()
elif len(inspect.signature(handler.received_announce).parameters) == 5:
def job():
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
announce_packet_hash = packet.packet_hash,
is_path_response = packet.context == RNS.Packet.PATH_RESPONSE)
threading.Thread(target=job, daemon=True).start()
with Transport.announce_handler_lock:
for handler in Transport.announce_handlers:
try:
# Check that the announced destination matches
# the handlers aspect filter
execute_callback = False
announce_identity = RNS.Identity.recall(packet.destination_hash)
if handler.aspect_filter == None:
# If the handlers aspect filter is set to
# None, we execute the callback in all cases
execute_callback = True
else:
raise TypeError("Invalid signature for announce handler callback")
handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity)
if packet.destination_hash == handler_expected_hash: execute_callback = True
except Exception as e:
RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
# If this is a path response, check whether the
# handler wants to receive it.
if packet.context == RNS.Packet.PATH_RESPONSE:
if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: pass
else: execute_callback = False
if execute_callback:
if len(inspect.signature(handler.received_announce).parameters) == 3:
def job():
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash))
threading.Thread(target=job, daemon=True).start()
elif len(inspect.signature(handler.received_announce).parameters) == 4:
def job():
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
announce_packet_hash = packet.packet_hash)
threading.Thread(target=job, daemon=True).start()
elif len(inspect.signature(handler.received_announce).parameters) == 5:
def job():
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
announce_packet_hash = packet.packet_hash,
is_path_response = packet.context == RNS.Packet.PATH_RESPONSE)
threading.Thread(target=job, daemon=True).start()
else:
raise TypeError("Invalid signature for announce handler callback")
except Exception as e:
RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
# Handling for link requests to local destinations
elif packet.packet_type == RNS.Packet.LINKREQUEST:
@@ -1992,10 +1972,9 @@ class Transport:
cached_packet.unpack()
RNS.Packet(destination=link, data=cached_packet.data,
packet_type=cached_packet.packet_type, context=cached_packet.context).send()
Transport.jobs_locked = False
else:
link.receive(packet)
else: link.receive(packet)
else:
# In the strange and rare case that an interface
# is partly malfunctioning, and a link-associated
@@ -2011,14 +1990,12 @@ class Transport:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
packet.destination = destination
if destination.receive(packet):
if destination.proof_strategy == RNS.Destination.PROVE_ALL:
packet.prove()
if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove()
elif destination.proof_strategy == RNS.Destination.PROVE_APP:
if destination.callbacks.proof_requested:
try:
if destination.callbacks.proof_requested(packet):
packet.prove()
if destination.callbacks.proof_requested(packet): packet.prove()
except Exception as e:
RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
@@ -2128,8 +2105,6 @@ class Transport:
if receipt in Transport.receipts:
Transport.receipts.remove(receipt)
Transport.jobs_locked = False
@staticmethod
def synthesize_tunnel(interface):
interface_hash = interface.get_hash()
@@ -2234,11 +2209,12 @@ class Transport:
def register_destination(destination):
destination.MTU = RNS.Reticulum.MTU
if destination.direction == RNS.Destination.IN:
for registered_destination in Transport.destinations:
if destination.hash == registered_destination.hash:
raise KeyError("Attempt to register an already registered destination.")
Transport.destinations.append(destination)
with Transport.destinations_lock:
for registered_destination in Transport.destinations:
if destination.hash == registered_destination.hash:
raise KeyError("Attempt to register an already registered destination.")
Transport.destinations.append(destination)
if Transport.owner.is_connected_to_shared_instance:
if destination.type == RNS.Destination.SINGLE:
@@ -2249,28 +2225,28 @@ class Transport:
@staticmethod
def deregister_destination(destination):
if destination in Transport.destinations:
Transport.destinations.remove(destination)
with Transport.destinations_lock:
if destination in Transport.destinations: Transport.destinations.remove(destination)
@staticmethod
def register_link(link):
RNS.log("Registering link "+str(link), RNS.LOG_EXTREME)
if link.initiator:
Transport.pending_links.append(link)
with Transport.pending_links_lock: Transport.pending_links.append(link)
else:
Transport.active_links.append(link)
with Transport.active_links_lock: Transport.active_links.append(link)
@staticmethod
def activate_link(link):
RNS.log("Activating link "+str(link), RNS.LOG_EXTREME)
if link in Transport.pending_links:
if link.status != RNS.Link.ACTIVE:
raise IOError("Invalid link state for link activation: "+str(link.status))
Transport.pending_links.remove(link)
Transport.active_links.append(link)
link.status = RNS.Link.ACTIVE
else:
RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR)
with Transport.pending_links_lock:
if link in Transport.pending_links:
if link.status != RNS.Link.ACTIVE: raise IOError("Invalid link state for link activation: "+str(link.status))
Transport.pending_links.remove(link)
with Transport.active_links_lock: Transport.active_links.append(link)
link.status = RNS.Link.ACTIVE
else:
RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR)
@staticmethod
def register_announce_handler(handler):
@@ -2283,9 +2259,9 @@ class Transport:
optionally have a *receive_path_responses* attribute set to ``True``, to also receive all path responses, in addition to live
announces. See the :ref:`Announce Example<example-announce>` for more info.
"""
if hasattr(handler, "received_announce") and callable(handler.received_announce):
if hasattr(handler, "aspect_filter"):
Transport.announce_handlers.append(handler)
with Transport.announce_handler_lock:
if hasattr(handler, "received_announce") and callable(handler.received_announce):
if hasattr(handler, "aspect_filter"): Transport.announce_handlers.append(handler)
@staticmethod
def deregister_announce_handler(handler):
@@ -2294,7 +2270,9 @@ class Transport:
:param handler: The announce handler to be deregistered.
"""
while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler)
with Transport.announce_handler_lock:
while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler)
gc.collect()
@staticmethod
@@ -2434,8 +2412,9 @@ class Transport:
:param destination_hash: A destination hash as *bytes*.
:returns: The number of hops to the specified destination, or ``RNS.Transport.PATHFINDER_M`` if the number of hops is unknown.
"""
if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_HOPS]
else: return Transport.PATHFINDER_M
with Transport.path_table_lock:
if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_HOPS]
else: return Transport.PATHFINDER_M
@staticmethod
def next_hop(destination_hash):
@@ -2443,8 +2422,9 @@ class Transport:
:param destination_hash: A destination hash as *bytes*.
:returns: The destination hash as *bytes* for the next hop to the specified destination, or *None* if the next hop is unknown.
"""
if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_NEXT_HOP]
else: return None
with Transport.path_table_lock:
if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_NEXT_HOP]
else: return None
@staticmethod
def next_hop_interface(destination_hash):
@@ -2452,8 +2432,9 @@ class Transport:
:param destination_hash: A destination hash as *bytes*.
:returns: The interface for the next hop to the specified destination, or *None* if the interface is unknown.
"""
if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_RVCD_IF]
else: return None
with Transport.path_table_lock:
if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_RVCD_IF]
else: return None
@staticmethod
def next_hop_interface_bitrate(destination_hash):

View File

@@ -1 +1 @@
__version__ = "1.1.4"
__version__ = "1.1.5"

View File

@@ -768,7 +768,7 @@ class TestLink(unittest.TestCase):
data = bytearray()
for rx in received:
data.extend(rx)
if rx: data.extend(rx)
rx_message = data
print(f"Received {len(received)} chunks, totalling {len(rx_message)} bytes")