Extended blackhole functionality to immediately terminate links if remote identifies as a blackholed identity

This commit is contained in:
Mark Qvist
2026-05-26 12:34:55 +02:00
parent e9609b7f25
commit d3fcc2a38c
2 changed files with 24 additions and 6 deletions
+9 -6
View File
@@ -1018,12 +1018,15 @@ class Link:
identity.load_public_key(public_key)
if identity.validate(signature, signed_data):
self.__remote_identity = identity
if self.callbacks.remote_identified != None:
try:
self.callbacks.remote_identified(self, self.__remote_identity)
except Exception as e:
RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
if RNS.Reticulum.get_instance().is_blackholed(identity.hash):
RNS.log(f"Terminating incoming link from blackholed identity {RNS.prettyhexrep(identity.hash)}", RNS.LOG_DEBUG) if RNS.sl(RNS.LOG_DEBUG) else None
self.teardown()
else:
self.__remote_identity = identity
if self.callbacks.remote_identified != None:
try: self.callbacks.remote_identified(self, self.__remote_identity)
except Exception as e: RNS.log(f"Error while executing remote identified callback from {self}. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.__update_phy_stats(packet, query_shared=True)
+15
View File
@@ -1196,6 +1196,7 @@ class Reticulum:
if path == "packet_snr": rpc_connection.send(self.get_packet_snr(call["packet_hash"]))
if path == "packet_q": rpc_connection.send(self.get_packet_q(call["packet_hash"]))
if path == "blackholed_identities": rpc_connection.send(self.get_blackholed_identities())
if path == "is_blackholed": rpc_connection.send(self.is_blackholed(call["identity_hash"]))
if "drop" in call:
path = call["drop"]
@@ -1673,6 +1674,20 @@ class Reticulum:
else: return RNS.Transport.blackholed_identities
def is_blackholed(self, identity):
if type(identity) == RNS.Identity: identity_hash = identity.hash
elif type(identity) == bytes: identity_hash = identity
else: raise TypeError("Invalid identity for blackhole check, must be hash as bytes or RNS.Identity")
if len(identity_hash) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError("Invalid identity hash length for blackhole check")
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "is_blackholed", "identity_hash": identity_hash})
response = rpc_connection.recv()
return response
else: return identity_hash in RNS.Transport.blackholed_identities
def blackhole_identity(self, identity_hash, until=None, reason=None):
if len(identity_hash) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: return False
else: