From 8b6609c588e6089ead4e98dfd0aa8bb7d04ff599 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 7 May 2026 17:59:31 +0200 Subject: [PATCH] Improved rnid options and control flow --- RNS/Utilities/rnid.py | 95 ++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 41 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index 518f8bd1..dc1f0681 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -128,6 +128,7 @@ def main(): # Information Flow parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network") + parser.add_argument("-N", "--no-cache", action="store_true", default=False, help="never used cached or network-sourced information") parser.add_argument("-t", action="store", metavar="seconds", type=float, help="identity request timeout before giving up", default=RNS.Transport.PATH_REQUEST_TIMEOUT) parser.add_argument("-p", "--print-identity", action="store_true", default=False, help="print identity info and exit") parser.add_argument("-P", "--print-private", action="store_true", default=False, help="allow displaying private keys") @@ -141,15 +142,15 @@ def main(): args = parser.parse_args() validate_args(args) - op_requires_identity = (args.sign or args.encrypt or args.decrypt or args.hash or args.announce or args.write + op_requires_identity = (args.sign or args.encrypt or args.decrypt or args.announce or args.write or args.print_identity or args.print_identity or args.export_pub or args.export_prv) - identity = get_operating_identity(args); op = False + identity = get_operating_identity(args, allow_none=not op_requires_identity, no_cache=args.no_cache); op = False if not identity and op_requires_identity: print("Could not get working identity"); exit(R_NO_IDENTITY) if args.print_identity: print_identity_information(args, identity); op = True if args.export_pub: export_pub_identity(args, identity); op = True if args.export_prv: export_prv_identity(args, identity); op = True - if args.hash: print_hash_information(args, identity); op = True + if args.hash: print_hash_information(args, identity or args.identity); op = True if args.announce: announce(args, identity); op = True if args.validate: validate(args, identity); op = True if args.sign: sign(args, identity); op = True @@ -182,7 +183,7 @@ def ensure_reticulum(args): # Identity Loading & Resolution # ################################# -def get_operating_identity(args): +def get_operating_identity(args, allow_none=False, no_cache=False): global reticulum identity = None @@ -209,36 +210,45 @@ def get_operating_identity(args): if not identity.get_private_key() or not identity.get_public_key(): raise SystemError("Missing key data in loaded identity") - except Exception as e: - print(f"Could not load Identity from specified file: {e}") - exit(9) + except Exception as e: print(f"Could not load Identity from specified file: {e}"); exit(R_INVALID_IDENTITY) + + elif no_cache: + if allow_none: return None + else: print("Could not resolve identity"); exit(R_NO_IDENTITY) # Attempt to recall Identity from hex-encoded hash elif len(args.identity) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2: try: ensure_reticulum(args) - ident_hash = bytes.fromhex(args.identity) - identity = RNS.Identity.recall(ident_hash) or RNS.Identity.recall(ident_hash, from_identity_hash=True) + requested_hash = bytes.fromhex(args.identity) + identity = RNS.Identity.recall(requested_hash) or RNS.Identity.recall(requested_hash, from_identity_hash=True) if identity == None: - if not args.request: - print("Could not recall Identity for "+RNS.prettyhexrep(ident_hash)+".") + if allow_none and not args.request: return None + elif not args.request: + print("Could not recall Identity for "+RNS.prettyhexrep(requested_hash)+".") print("You can query the network for unknown Identities with the -R option.") - exit(5) + exit(R_NO_IDENTITY) else: - RNS.Transport.request_path(ident_hash) - def spincheck(): return RNS.Identity.recall(ident_hash) != None - spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(ident_hash), args.t) + id_destination_hash = RNS.Destination.hash_from_name_and_identity(DEFAULT_ASPECTS, identity) + RNS.Transport.request_path(requested_hash) # Acquire if this was a destination hash + RNS.Transport.request_path(id_destination_hash) # Acquire if this was an identity hash + def spincheck(): return RNS.Identity.recall(requested_hash) != None + spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(requested_hash), args.t) - if not spincheck(): print("Identity request timed out"); exit(6) + if not spincheck(): + print("Identity request timed out"); + if not allow_none: exit(R_NO_IDENTITY) + else: return None + else: - identity = RNS.Identity.recall(ident_hash) - print("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(ident_hash)+" from the network") + identity = RNS.Identity.recall(requested_hash) + print("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(requested_hash)+" from the network") else: ident_str = str(identity) - hash_str = RNS.prettyhexrep(ident_hash) + hash_str = RNS.prettyhexrep(requested_hash) if ident_str == hash_str: print(f"Recalled Identity {ident_str}") else: print(f"Recalled Identity {ident_str} for destination {hash_str}") @@ -284,13 +294,9 @@ def get_operating_identity(args): print("Reticulum Identity imported from base64 input") except: pass - if not identity_bytes: - print("Could not decode specified data to a valid public Reticulum Identity") - exit(41) + if not identity_bytes: print("Could not decode specified data to a valid public Reticulum Identity"); exit(R_INVALID_IDENTITY) - except Exception as e: - print("Invalid identity data specified for private identity import: "+str(e)) - exit(41) + except Exception as e: print("Invalid identity data specified for private identity import: "+str(e)); exit(R_INVALID_IDENTITY) elif args.import_prv: try: @@ -327,27 +333,19 @@ def get_operating_identity(args): print("Reticulum Identity imported from base64 input") except: pass - if not identity_bytes: - print("Could not decode specified data to a valid private Reticulum Identity") - exit(41) + if not identity_bytes: print("Could not decode specified data to a valid private Reticulum Identity"); exit(R_INVALID_IDENTITY) - except Exception as e: - print("Invalid identity data specified for private identity import: "+str(e)) - exit(41) + except Exception as e: print("Invalid identity data specified for private identity import: "+str(e)); exit(R_INVALID_IDENTITY) if args.import_prv: try: identity = RNS.Identity.from_bytes(identity_bytes) - except Exception as e: - print("Could not create Reticulum identity from specified data: "+str(e)) - exit(42) + except Exception as e: print("Could not create Reticulum identity from specified data: "+str(e)); exit(R_INVALID_IDENTITY) elif args.import_pub: try: identity = RNS.Identity(create_keys=False) identity.load_public_key(identity_bytes) - except Exception as e: - print("Could not create Reticulum identity from specified data: "+str(e)) - exit(42) + except Exception as e: print("Could not create Reticulum identity from specified data: "+str(e)); exit(R_INVALID_IDENTITY) return identity @@ -661,15 +659,30 @@ def print_identity_information(args, identity): def print_hash_information(args, identity): try: + hashlen = RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2 + if not type(identity) in [RNS.Identity, str]: print("Invalid identity"); exit(R_INVALID_IDENTITY) + if type(identity) == str and len(identity) != hashlen: print("Invalid identity hash length"); exit(R_INVALID_IDENTITY) + aspects = args.hash.split(".") if not len(aspects) > 0: print("Invalid destination aspects specified"); exit(R_INVALID_ASPECTS) else: app_name = aspects[0]; aspects = aspects[1:] - if not identity.get_public_key(): print("Identity does not hold a public key"); exit(R_NO_PUBKEY) + if type(identity) == RNS.Identity and not identity.get_public_key(): print("Identity does not hold a public key"); exit(R_NO_PUBKEY) else: - destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) - print("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) - print("The full destination specifier is "+str(destination)) + if type(identity) == RNS.Identity: + identity_hash = identity.hash + destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) + + elif type(identity) == str: + destination = None + try: identity_hash = bytes.fromhex(identity) + except Exception as e: print(f"Invalid identity: {e}"); exit(R_INVALID_IDENTITY) + + else: print("Invalid identity"); exit(R_INVALID_IDENTITY) + + destination_hash = RNS.Destination.hash_from_name_and_identity(args.hash, identity_hash) + print(f"The {args.hash} destination for this Identity is {RNS.prettyhexrep(destination_hash)}") + if destination: print("The full destination specifier is "+str(destination)) except Exception as e: print(f"An error ocurred while attempting to get hash information: {e}"); exit(R_UNKNOWN_ERROR)