Improved announce processing

This commit is contained in:
Mark Qvist
2026-01-01 14:51:33 +01:00
parent e56e80aade
commit 5392d635dd
2 changed files with 48 additions and 45 deletions

View File

@@ -467,7 +467,10 @@ class Transport:
completed_announces = [] completed_announces = []
for destination_hash in Transport.announce_table: for destination_hash in Transport.announce_table:
announce_entry = Transport.announce_table[destination_hash] announce_entry = Transport.announce_table[destination_hash]
if announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R: if announce_entry[IDX_AT_RETRIES] > 0 and announce_entry[IDX_AT_RETRIES] >= Transport.LOCAL_REBROADCASTS_MAX:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME)
completed_announces.append(destination_hash)
elif announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME) RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME)
completed_announces.append(destination_hash) completed_announces.append(destination_hash)
else: else:
@@ -787,8 +790,7 @@ class Transport:
Transport.jobs_running = False Transport.jobs_running = False
for packet in outgoing: for packet in outgoing: packet.send()
packet.send()
for destination_hash in path_requests: for destination_hash in path_requests:
blocked_if = path_requests[destination_hash] blocked_if = path_requests[destination_hash]
@@ -845,8 +847,7 @@ class Transport:
@staticmethod @staticmethod
def outbound(packet): def outbound(packet):
while (Transport.jobs_running): while (Transport.jobs_running): sleep(0.0005)
sleep(0.0005)
Transport.jobs_locked = True Transport.jobs_locked = True
@@ -1491,17 +1492,17 @@ class Transport:
announce_entry = Transport.announce_table[packet.destination_hash] announce_entry = Transport.announce_table[packet.destination_hash]
if packet.hops-1 == announce_entry[IDX_AT_HOPS]: if packet.hops-1 == announce_entry[IDX_AT_HOPS]:
RNS.log("Heard a local rebroadcast of announce for "+RNS.prettyhexrep(packet.destination_hash), RNS.LOG_DEBUG) RNS.log(f"Heard a rebroadcast of announce for {RNS.prettyhexrep(packet.destination_hash)} on {packet.receiving_interface}", RNS.LOG_EXTREME)
announce_entry[IDX_AT_LCL_RBRD] += 1 announce_entry[IDX_AT_LCL_RBRD] += 1
if announce_entry[IDX_AT_LCL_RBRD] >= Transport.LOCAL_REBROADCASTS_MAX: if announce_entry[IDX_AT_RETRIES] > 0:
RNS.log("Max local rebroadcasts of announce for "+RNS.prettyhexrep(packet.destination_hash)+" reached, dropping announce from our table", RNS.LOG_DEBUG) if announce_entry[IDX_AT_LCL_RBRD] >= Transport.LOCAL_REBROADCASTS_MAX:
if packet.destination_hash in Transport.announce_table: RNS.log("Completed announce processing for "+RNS.prettyhexrep(packet.destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME)
Transport.announce_table.pop(packet.destination_hash) if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash)
if packet.hops-1 == announce_entry[IDX_AT_HOPS]+1 and announce_entry[IDX_AT_RETRIES] > 0: if packet.hops-1 == announce_entry[IDX_AT_HOPS]+1 and announce_entry[IDX_AT_RETRIES] > 0:
now = time.time() now = time.time()
if now < announce_entry[IDX_AT_RTRNS_TMO]: if now < announce_entry[IDX_AT_RTRNS_TMO]:
RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_DEBUG) RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_EXTREME)
if packet.destination_hash in Transport.announce_table: if packet.destination_hash in Transport.announce_table:
Transport.announce_table.pop(packet.destination_hash) Transport.announce_table.pop(packet.destination_hash)
@@ -1802,16 +1803,13 @@ class Transport:
execute_callback = True execute_callback = True
else: else:
handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity)
if packet.destination_hash == handler_expected_hash: if packet.destination_hash == handler_expected_hash: execute_callback = True
execute_callback = True
# If this is a path response, check whether the # If this is a path response, check whether the
# handler wants to receive it. # handler wants to receive it.
if packet.context == RNS.Packet.PATH_RESPONSE: if packet.context == RNS.Packet.PATH_RESPONSE:
if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: pass
pass else: execute_callback = False
else:
execute_callback = False
if execute_callback: if execute_callback:
if len(inspect.signature(handler.received_announce).parameters) == 3: if len(inspect.signature(handler.received_announce).parameters) == 3:
@@ -2903,38 +2901,41 @@ class Transport:
serialised_destinations = [] serialised_destinations = []
for destination_hash in Transport.path_table.copy(): for destination_hash in Transport.path_table.copy():
# Get the destination entry from the destination table try:
de = Transport.path_table[destination_hash]
interface_hash = de[IDX_PT_RVCD_IF].get_hash()
# Only store destination table entry if the associated
# interface is still active
interface = Transport.find_interface_from_hash(interface_hash)
if interface != None:
# Get the destination entry from the destination table # Get the destination entry from the destination table
de = Transport.path_table[destination_hash] de = Transport.path_table[destination_hash]
timestamp = de[IDX_PT_TIMESTAMP] interface_hash = de[IDX_PT_RVCD_IF].get_hash()
received_from = de[IDX_PT_NEXT_HOP]
hops = de[IDX_PT_HOPS]
expires = de[IDX_PT_EXPIRES]
random_blobs = de[IDX_PT_RANDBLOBS]
packet_hash = de[IDX_PT_PACKET]
serialised_entry = [ # Only store destination table entry if the associated
destination_hash, # interface is still active
timestamp, interface = Transport.find_interface_from_hash(interface_hash)
received_from, if interface != None:
hops, # Get the destination entry from the destination table
expires, de = Transport.path_table[destination_hash]
random_blobs, timestamp = de[IDX_PT_TIMESTAMP]
interface_hash, received_from = de[IDX_PT_NEXT_HOP]
packet_hash hops = de[IDX_PT_HOPS]
] expires = de[IDX_PT_EXPIRES]
random_blobs = de[IDX_PT_RANDBLOBS]
packet_hash = de[IDX_PT_PACKET]
serialised_destinations.append(serialised_entry) serialised_entry = [
destination_hash,
timestamp,
received_from,
hops,
expires,
random_blobs,
interface_hash,
packet_hash
]
# TODO: Reevaluate whether there is any cases where this is needed serialised_destinations.append(serialised_entry)
# Transport.cache(de[IDX_PT_PACKET], force_cache=True)
# TODO: Reevaluate whether there is any cases where this is needed
# Transport.cache(de[IDX_PT_PACKET], force_cache=True)
except Exception as e: RNS.log(f"Skipping persist for path table entry due to error: {e}", RNS.LOG_ERROR)
path_table_path = RNS.Reticulum.storagepath+"/destination_table" path_table_path = RNS.Reticulum.storagepath+"/destination_table"
file = open(path_table_path, "wb") file = open(path_table_path, "wb")

View File

@@ -143,7 +143,9 @@ def log(msg, level=3, _override_destination = False, pt=False):
with logging_lock: with logging_lock:
if (logdest == LOG_STDOUT or _always_override_destination or _override_destination): if (logdest == LOG_STDOUT or _always_override_destination or _override_destination):
if not threading.main_thread().is_alive(): return if not threading.main_thread().is_alive(): return
else: print(logstring) else:
try: print(logstring)
except: pass
elif (logdest == LOG_FILE and logfile != None): elif (logdest == LOG_FILE and logfile != None):
try: try: