From 928c02099b83ef733f063099577d96d950ec0181 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 7 May 2026 14:11:49 +0200 Subject: [PATCH] Refactoring work for rnid --- RNS/Cryptography/Ed25519.py | 1 - RNS/Cryptography/Hashes.py | 4 + RNS/Identity.py | 22 +- RNS/Utilities/rnid.py | 1238 +++++++++++++++++++---------------- 4 files changed, 709 insertions(+), 556 deletions(-) diff --git a/RNS/Cryptography/Ed25519.py b/RNS/Cryptography/Ed25519.py index b7ffccd9..b72eb602 100644 --- a/RNS/Cryptography/Ed25519.py +++ b/RNS/Cryptography/Ed25519.py @@ -35,7 +35,6 @@ class Ed25519PrivateKey: def __init__(self, seed): self.seed = seed self.sk = ed25519.SigningKey(self.seed) - #self.vk = self.sk.get_verifying_key() @classmethod def generate(cls): diff --git a/RNS/Cryptography/Hashes.py b/RNS/Cryptography/Hashes.py index 02b85144..a96c8e1d 100644 --- a/RNS/Cryptography/Hashes.py +++ b/RNS/Cryptography/Hashes.py @@ -62,3 +62,7 @@ def sha512(data): digest.update(data) return digest.digest() + +def file_sha256(file): + if not hashlib: raise SystemError("The hashlib module is not available on this system") + else: return hashlib.file_digest(file, "sha256").digest() diff --git a/RNS/Identity.py b/RNS/Identity.py index 6a4fa914..7dded286 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -656,6 +656,22 @@ class Identity: RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e)) + def pub_to_file(self, path): + """ + Saves the public identity to a file. + + :param path: The full path specifying where to save the identity. + :returns: True if the file was saved, otherwise False. + """ + try: + with open(path, "wb") as key_file: + key_file.write(self.get_public_key()) + return True + return False + except Exception as e: + RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e)) + def __init__(self,create_keys=True): # Initialize keys to none self.prv = None @@ -695,13 +711,15 @@ class Identity: """ :returns: The private key as *bytes* """ - return self.prv_bytes+self.sig_prv_bytes + if self.prv_bytes and self.sig_prv_bytes: return self.prv_bytes+self.sig_prv_bytes + else: return None def get_public_key(self): """ :returns: The public key as *bytes* """ - return self.pub_bytes+self.sig_pub_bytes + if self.pub_bytes and self.sig_pub_bytes: return self.pub_bytes+self.sig_pub_bytes + else: return None def load_private_key(self, prv_bytes): """ diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index a8de76c6..0fb119f1 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -35,21 +35,534 @@ import argparse import time import sys import os +import io import base64 from RNS._version import __version__ +from RNS.vendor.umsgpack import umsgpack as mp +from RNS.Cryptography.Hashes import sha256 +from RNS.Cryptography.Hashes import file_sha256 -APP_NAME = "rnid" +APP_NAME = "rns" +DEFAULT_ASPECTS = f"{APP_NAME}.id" -SIG_EXT = "rsg" +PRV_EXT = "rid" +PUB_EXT = "pub" +SIG_EXT = "rsg" ENCRYPT_EXT = "rfe" -CHUNK_SIZE = 16*1024*1024 +CHUNK_SIZE = 16*1024*1024 + +RSG_HASHTYPES = ["sha256"] + +R_OK = 0 +R_FILE_EXISTS = 1 +R_NO_FILE = 1 +R_NO_SIG_FILE = 1 +R_NO_PUBKEY = 2 +R_NO_PRVKEY = 3 +R_NO_KEYS = 4 +R_WRITE_ERROR = 5 +R_INVALID_ASPECTS = 4 +R_INVALID_SIGNATURE = 128 +R_READ_ERROR = 252 +R_WRITE_ERROR = 253 +R_UNKNOWN_ERROR = 254 +R_INTERRUPTED = 255 + +reticulum = None + +def validate_args(args): + ops = 0; + for o in [args.encrypt, args.decrypt, args.validate, args.sign]: + if o: ops += 1 + if ops > 1: print("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation"); exit(1) + + g = 0; + for a in [args.import_pub, args.import_prv, args.identity, args.generate]: + if a: g += 1 + if g > 1: print("The -i, -g, -m and -M args are mutually exclusive"); exit(1) + + g = 0; + for a in [args.base64, args.base32]: + if a: g += 1 + if g > 1: print("The -b and -B args are mutually exclusive"); exit(1) + + return True + +def main(): + try: + parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility") + + # Identity Resolution + parser.add_argument("--config", metavar="path", action="store", default=None, help="path to alternative Reticulum config directory", type=str) + parser.add_argument("-i", "--identity", metavar="rid", action="store", default=None, help="hexadecimal Reticulum identity or destination hash, or path to Identity file", type=str) + parser.add_argument("-g", "--generate", metavar="path", action="store", default=None, help="generate a new Identity and save to path") + parser.add_argument("-m", "--import-pub", dest="import_pub", metavar="rid", action="store", default=None, help="import public Reticulum identity in hex, base32 or base64 format, or from file", type=str) + parser.add_argument("-M", "--import-prv", dest="import_prv", metavar="rid", action="store", default=None, help="import Reticulum identity in hex, base32 or base64 format, or from file", type=str) + parser.add_argument("-x", "--export-pub", action="store_true", default=None, help="export public identity to hex, base32 or base64 format") + parser.add_argument("-X", "--export-prv", action="store_true", default=None, help="export private identity to hex, base32 or base64 format, or to file") + + # Verbosity Control + parser.add_argument("-v", "--verbose", action="count", default=0, help="increase verbosity") + parser.add_argument("-q", "--quiet", action="count", default=0, help="decrease verbosity") + + # Operations + parser.add_argument("-a", "--announce", metavar="aspects", action="store", nargs="?", const=DEFAULT_ASPECTS, default=None, help="announce a destination based on this Identity") + parser.add_argument("-H", "--hash", metavar="aspects", action="store", default=None, help="show destination hashes for other aspects for this Identity") + parser.add_argument("-d", "--decrypt", metavar="file", action="store", default=None, help="decrypt file") + parser.add_argument("-e", "--encrypt", metavar="file", action="store", default=None, help="encrypt file") + parser.add_argument("-V", "--validate", metavar="path", action="store", default=None, help="validate signature") + parser.add_argument("-s", "--sign", metavar="path", action="store", default=None, help="sign file") + parser.add_argument("--raw", action="store_true", default=False, help="sign raw input data instead of hashing first") + + # I/O Control + parser.add_argument("-r", "--read", metavar="file", action="store", default=None, help="input file path", type=str) + parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str) + parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files") + parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file" + parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file", + + # Information Flow + parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network") + parser.add_argument("-t", action="store", metavar="seconds", type=float, help="identity request timeout before giving up", default=RNS.Transport.PATH_REQUEST_TIMEOUT) + parser.add_argument("-p", "--print-identity", action="store_true", default=False, help="print identity info and exit") + parser.add_argument("-P", "--print-private", action="store_true", default=False, help="allow displaying private keys") + + # Formatting Control + parser.add_argument("-b", "--base64", action="store_true", default=False, help="Use base64-encoded input and output") + parser.add_argument("-B", "--base32", action="store_true", default=False, help="Use base32-encoded input and output") + + parser.add_argument("--version", action="version", version="rnid {version}".format(version=__version__)) + + args = parser.parse_args() + validate_args(args) + + if not args.read: + if args.encrypt: args.read = args.encrypt + if args.decrypt: args.read = args.decrypt + if args.sign: args.read = args.sign + if args.validate: args.read = args.validate + + identity = get_operating_identity(args) + if not identity: print("Could not get working identity"); exit(9) + if args.print_identity: print_identity_information(args, identity) + if args.export_pub: export_pub_identity(args, identity) + if args.export_prv: export_prv_identity(args, identity) + if args.hash: print_hash_information(args, identity) + if args.write: write_identity(args, identity) + if args.announce: announce(args, identity) + if args.validate: validate(args, identity) + if args.sign: sign(args, identity) + if args.encrypt: encrypt(args, identity) + if args.decrypt: decrypt(args, identity) + + exit(0) + + except KeyboardInterrupt: print(""); exit(R_INTERRUPTED) + + +##################### +# Reticulum Helpers # +##################### + +def ensure_reticulum(args): + global reticulum + if not reticulum: + targetloglevel = 4; verbosity = args.verbose; quietness = args.quiet + if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness + reticulum = RNS.Reticulum(configdir=args.config, loglevel=targetloglevel) + RNS.compact_log_fmt = True + if args.stdout: RNS.loglevel = -1 + + +################################# +# Identity Loading & Resolution # +################################# + +def get_operating_identity(args): + global reticulum + identity = None + + if args.generate: + identity = RNS.Identity() + if not args.force and os.path.isfile(args.generate): + print("Identity file "+str(args.generate)+" already exists. Not overwriting.") + exit(R_FILE_EXISTS) + + else: + try: identity.to_file(args.generate); print(f"New identity {identity} written to {args.generate}") + except Exception as e: print(f"An error ocurred while saving the generated Identity: {e}"); exit(R_WRITE_ERROR) + + elif args.identity: + load_path = None + try: load_path = os.path.expanduser(args.identity) + except: pass + + if load_path and os.path.isfile(load_path): + # Attempt to load Identity from .rid file + try: + identity = RNS.Identity.from_file(load_path) + print(f"Loaded Identity {identity} from {load_path}") + if not identity.get_private_key() or not identity.get_public_key(): + raise SystemError("Missing key data in loaded identity") + + except Exception as e: + print(f"Could not load Identity from specified file: {e}") + exit(9) + + elif len(identity_str) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2: + # Attempt to recall Identity from hex-encoded hash + try: + ident_hash = bytes.fromhex(identity_str) + identity = RNS.Identity.recall(ident_hash) or RNS.Identity.recall(ident_hash, from_identity_hash=True) + + if identity == None: + if not args.request: + print("Could not recall Identity for "+RNS.prettyhexrep(ident_hash)+".") + print("You can query the network for unknown Identities with the -R option.") + exit(5) + + else: + ensure_reticulum(args) + RNS.Transport.request_path(ident_hash) + def spincheck(): return RNS.Identity.recall(ident_hash) != None + spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(ident_hash), args.t) + + if not spincheck(): print("Identity request timed out"); exit(6) + else: + identity = RNS.Identity.recall(ident_hash) + print("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(ident_hash)+" from the network") + + else: + ident_str = str(identity) + hash_str = RNS.prettyhexrep(ident_hash) + if ident_str == hash_str: print(f"Recalled Identity {ident_str}") + else: print(f"Recalled Identity {ident_str} for destination {hash_str}") + + + except Exception as e: print("Invalid hexadecimal hash provided"); exit(7) + + elif args.import_pub or args.import_prv: + prvsize = RNS.Identity.KEYSIZE//8 + pubsize = prvsize + identity_bytes = None + if args.import_pub: + try: + identity_bytes = None + import_path = os.path.expanduser(args.import_pub) + if os.path.isfile(import_path): + try: + with open(import_path, "rb") as fh: file_input = fh.read() + if file_input and len(file_input) == pubsize: + identity_bytes = file_input + print(f"Reticulum Identity imported from {import_path}") + except: pass + + if not identity_bytes: + if len(args.import_pub) == pubsize*2: + try: + identity_bytes = bytes.fromhex(args.import_pub) + print("Reticulum Identity imported from hex input") + except: pass + + if not identity_bytes: + try: + b32_decoded = base64.b32decode(args.import_pub) + if len(b32_decoded) == pubsize: + identity_bytes = b32_decoded + print("Reticulum Identity imported from base32 input") + except: pass + + if not identity_bytes: + try: + b64_decoded = base64.urlsafe_b64decode(args.import_pub) + if len(b64_decoded) == pubsize: + identity_bytes = b64_decoded + print("Reticulum Identity imported from base64 input") + except: pass + + if not identity_bytes: + print("Could not decode specified data to a valid public Reticulum Identity") + exit(41) + + except Exception as e: + print("Invalid identity data specified for private identity import: "+str(e)) + exit(41) + + elif args.import_prv: + try: + identity_bytes = None + import_path = os.path.expanduser(args.import_prv) + if os.path.isfile(import_path): + try: + with open(import_path, "rb") as fh: file_input = fh.read() + if file_input and len(file_input) == prvsize: + identity_bytes = file_input + print(f"Reticulum Identity imported from {import_path}") + except: pass + + if not identity_bytes: + if len(args.import_prv) == prvsize*2: + try: + identity_bytes = bytes.fromhex(args.import_prv) + print("Reticulum Identity imported from hex input") + except: pass + + if not identity_bytes: + try: + b32_decoded = base64.b32decode(args.import_prv) + if len(b32_decoded) == prvsize: + identity_bytes = b32_decoded + print("Reticulum Identity imported from base32 input") + except: pass + + if not identity_bytes: + try: + b64_decoded = base64.urlsafe_b64decode(args.import_prv) + if len(b64_decoded) == prvsize: + identity_bytes = b64_decoded + print("Reticulum Identity imported from base64 input") + except: pass + + if not identity_bytes: + print("Could not decode specified data to a valid private Reticulum Identity") + exit(41) + + except Exception as e: + print("Invalid identity data specified for private identity import: "+str(e)) + exit(41) + + if args.import_prv: + try: identity = RNS.Identity.from_bytes(identity_bytes) + except Exception as e: + print("Could not create Reticulum identity from specified data: "+str(e)) + exit(42) + + elif args.import_pub: + try: + identity = RNS.Identity(create_keys=False) + identity.load_public_key(identity_bytes) + except Exception as e: + print("Could not create Reticulum identity from specified data: "+str(e)) + exit(42) + + return identity + + +###################### +# Network Operations # +###################### + +def announce(args, identity): + try: + ensure_reticulum(args) + aspects = args.announce.split(".") + if not len(aspects) > 1: print("Invalid destination aspects specified"); exit(R_INVALID_ASPECTS) + else: + app_name = aspects[0]; aspects = aspects[1:] + if not identity.get_private_key(): print("Cannot announce this destination, since the private key is not held"); exit(R_NO_PRVKEY) + else: + destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects) + print(f"Announcing {args.announce} destination {RNS.prettyhexrep(destination.hash)} for identity {identity}") + destination.announce(); time.sleep(0.25) + + except Exception as e: print(f"An error ocurred while attempting to send the announce: {e}"); exit(R_UNKNOWN_ERROR) + + +############################ +# Cryptographic Operations # +############################ + +def read_rsg(rsg): + rsg_data = None + if type(rsg) == bytes: rsg_data = rsg + elif type(rsg) == io.BufferReader: rsg_data = rsg.read() + elif type(rsg) == str: + try: rsg_data = base64.urlsafe_b64decode(rsg) + except: pass + try: rsg_data = base64.b32decode(rsg) + except: pass + + if not rsg_data: return False + else: + siglen = Identity.SIGLENGTH//8 + if len(rsg_data) == siglen: return rsg_data, None + if len(rsg_data) < siglen+1: return False + else: + signature = rsg_data[:siglen] + envelope = rsg_data[siglen:] + + try: signed_data = mp.unpackb(envelope) + except: return False + + if not "hashtype" in signed_data or not "hash" in signed_data: return False, None + if not signed_data["hash"] in RSG_HASHTYPES: return False, None + if not "meta" in signed_data: return False, None + if not "signer" in signed_data["meta"]: return False, None + if not "pubkey" in signed_data["meta"]: return False, None + if not "note" in signed_data["meta"]: return False, None + + return signature, signed_data + +def create_rsg(signer_identity, signature_input, note=None, meta=None, output="bin"): + if not output in ["bin", "hex", "base32", "base64"]: raise TypeError(f"Invalid output format for rsg creation") + if not type(signer_identity) == RNS.Identity.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity") + if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key") + + if type(signature_input) == bytes: sha = sha256(signature_input) + elif type(signature_input) == str: sha = sha256(signature_input.encode("utf-8")) + elif type(signature_input) == io.BufferReader: sha = file_sha256(signature_input) + else: raise TypeError(f"Invalid input type {type(signature_input)} for rsg creation") + + signed_data = { "hashtype": "sha256", "hash": sha, + "meta": { "signer": identity.hash, + "pubkey": identity.get_public_key(), + "note" : note } } + + if meta and type(meta) == dict: + for key in meta: + if not key in signed_data["meta"]: signed_data["meta"]["key"] = meta["key"] + + envelope = mp.packb(signed_data) + signature = signer_identity.sign(envelope) + + return signature+envelope + +def validate(args, identity): + sig_ext = f".{SIG_EXT}" + validate_path = os.path.expanduser(args.validate) + path_is_sigfile = validate_path.lower().endswith(sig_ext) + if path_is_sigfile: signature_path = validate_path; file_path = validate_path[:-len(sig_ext)] + else: signature_path = f"{validate_path}{sig_ext}"; file_path = validate_path + signature_exists = os.path.isfile(signature_path) + file_exists = os.path.isfile(file_path) + + if not file_exists: print(f"The validation target \"{file_path}\" does not exist"); exit(R_NO_FILE) + if not signature_exists: print(f"No signature file exists for \"{file_path}\""); exit(R_NO_FILE) + + try: + with open(signature_path, "rb") as fh: signature = fh.read() + except Exception as e: print(f"Could not read signature: {e}"); exit(R_READ_ERROR) + + try: + with open(file_path, "rb") as fh: valid = identity.validate(signature, fh.read()) + if not valid: print(f"Invalid signature {signature_path} for file {file_path}"); exit(R_INVALID_SIGNATURE) + else: print(f"Signature is valid, the file {file_path} was signed by {identity}."); exit(R_OK) + + except Exception as e: print(f"Could not validate signature: {e}"); exit(R_READ_ERROR) + +def sign(args, identity): + sig_ext = f".{SIG_EXT}" + sign_path = os.path.expanduser(args.sign) + signature_path = f"{sign_path}{sig_ext}" + file_exists = os.path.isfile(sign_path) + signature_exists = os.path.isfile(signature_path) + + if not identity.get_private_key(): print(f"Cannot sign \"{sign_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY) + if not file_exists: print(f"The file \"{sign_path}\" does not exist"); exit(R_NO_FILE) + if signature_exists and not args.force: + print(f"The signature file \"{signature_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS) + + try: + if args.raw: + with open(sign_path, "rb") as fh: data = fh.read() + with open(signature_path, "wb") as fh: fh.write(identity.sign(data)) + + else: + with open(sign_path, "rb") as in_file: + with open(signature_path, "rb") as out_file: + out_file.write(create_rsg(identity, in_file)) + + print(f"Signed file {sign_path} with {identity}"); exit(R_OK) + + except Exception as e: print(f"Could not sign {sign_path}: {e}"); exit(R_UNKNOWN_ERROR) + +def encrypt(args, identity): + pass + +def decrypt(args, identity): + pass + +################ +# File Output # +################ + +def write_identity(args, identity): + try: + wp = os.path.expanduser(args.write) + args.write = False + if identity.get_private_key() and args.export_prv: + if not os.path.isfile(wp) or args.force: + identity.to_file(wp) + print("Wrote private identity to "+str(wp)) + else: print("File "+str(wp)+" already exists, not overwriting"); exit(R_FILE_EXISTS) + + elif identity.get_public_key(): + if not wp.lower().endswith(f".{PUB_EXT}"): wp += f".{PUB_EXT}" + if not os.path.isfile(wp) or args.force: + identity.pub_to_file(wp) + print("Wrote public identity to "+str(wp)) + else: print("File "+str(wp)+" already exists, not overwriting"); exit(R_FILE_EXISTS) + + else: print("Identity holds neither a public nor private key"); exit(R_NO_KEYS) + except Exception as e: print("Error while writing imported identity to file: "+str(e)); exit(R_WRITE_ERROR) + +################### +# Terminal Output # +################### + +def print_identity_information(args, identity): + print("Identity Hash : "+RNS.prettyhexrep(identity.hash)) + if args.base64: print("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) + elif args.base32: print("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8")) + else: print("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False)) + + if identity.prv: + if args.print_private: + if args.base64: print("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) + elif args.base32: print("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8")) + else: print("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False)) + else: print("Private Key : Hidden") + +def print_hash_information(args, identity): + try: + aspects = args.hash.split(".") + if not len(aspects) > 0: print("Invalid destination aspects specified"); exit(R_INVALID_ASPECTS) + else: + app_name = aspects[0]; aspects = aspects[1:] + if not identity.get_public_key(): print("Identity does not hold a public key"); exit(R_NO_PUBKEY) + else: + destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) + print("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) + print("The full destination specifier is "+str(destination)) + + except Exception as e: print(f"An error ocurred while attempting to get hash information: {e}"); exit(R_UNKNOWN_ERROR) + +def export_pub_identity(args, identity): + k = identity.get_public_key() + if not k: print("Identity doesn't hold a public key, cannot export"); exit(R_NO_PUBKEY) + else: + if args.base64: print("Public Identity Keys : "+base64.urlsafe_b64encode(k).decode("utf-8")) + elif args.base32: print("Public Identity Keys : "+base64.b32encode(k).decode("utf-8")) + else: print("Public Identity Keys : "+RNS.hexrep(k, delimit=False)) + +def export_prv_identity(args, identity): + k = identity.get_private_key() + if not k: print("Identity doesn't hold a private key, cannot export"); exit(R_NO_PRVKEY) + else: + if args.base64: print("Private Identity Keys : "+base64.urlsafe_b64encode(k).decode("utf-8")) + elif args.base32: print("Private Identity Keys : "+base64.b32encode(k).decode("utf-8")) + else: print("Private Identity Keys : "+RNS.hexrep(k, delimit=False)) + + +############################## +# Helper & Utility Functions # +############################## def spin(until=None, msg=None, timeout=None): i = 0 syms = "⢄⢂⢁⡁⡈⡐⡠" - if timeout != None: - timeout = time.time()+timeout + if timeout != None: timeout = time.time()+timeout print(msg+" ", end=" ") while (timeout == None or time.time() timeout: - return False - else: - return True - -def main(): - try: - parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility") - # parser.add_argument("file", nargs="?", default=None, help="input file path", type=str) - - parser.add_argument("--config", metavar="path", action="store", default=None, help="path to alternative Reticulum config directory", type=str) - parser.add_argument("-i", "--identity", metavar="identity", action="store", default=None, help="hexadecimal Reticulum identity or destination hash, or path to Identity file", type=str) - parser.add_argument("-g", "--generate", metavar="file", action="store", default=None, help="generate a new Identity") - parser.add_argument("-m", "--import", dest="import_str", metavar="identity_data", action="store", default=None, help="import Reticulum identity in hex, base32 or base64 format", type=str) - parser.add_argument("-x", "--export", action="store_true", default=None, help="export identity to hex, base32 or base64 format") - - parser.add_argument("-v", "--verbose", action="count", default=0, help="increase verbosity") - parser.add_argument("-q", "--quiet", action="count", default=0, help="decrease verbosity") - - parser.add_argument("-a", "--announce", metavar="aspects", action="store", default=None, help="announce a destination based on this Identity") - parser.add_argument("-H", "--hash", metavar="aspects", action="store", default=None, help="show destination hashes for other aspects for this Identity") - parser.add_argument("-e", "--encrypt", metavar="file", action="store", default=None, help="encrypt file") - parser.add_argument("-d", "--decrypt", metavar="file", action="store", default=None, help="decrypt file") - parser.add_argument("-s", "--sign", metavar="path", action="store", default=None, help="sign file") - parser.add_argument("-V", "--validate", metavar="path", action="store", default=None, help="validate signature") - - parser.add_argument("-r", "--read", metavar="file", action="store", default=None, help="input file path", type=str) - parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str) - parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files") - parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file" - parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file", - - parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network") - parser.add_argument("-t", action="store", metavar="seconds", type=float, help="identity request timeout before giving up", default=RNS.Transport.PATH_REQUEST_TIMEOUT) - parser.add_argument("-p", "--print-identity", action="store_true", default=False, help="print identity info and exit") - parser.add_argument("-P", "--print-private", action="store_true", default=False, help="allow displaying private keys") - - parser.add_argument("-b", "--base64", action="store_true", default=False, help="Use base64-encoded input and output") - parser.add_argument("-B", "--base32", action="store_true", default=False, help="Use base32-encoded input and output") - - parser.add_argument("--version", action="version", version="rnid {version}".format(version=__version__)) - - args = parser.parse_args() - - ops = 0; - for t in [args.encrypt, args.decrypt, args.validate, args.sign]: - if t: - ops += 1 - - if ops > 1: - RNS.log("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation", RNS.LOG_ERROR) - exit(1) - - if not args.read: - if args.encrypt: - args.read = args.encrypt - if args.decrypt: - args.read = args.decrypt - if args.sign: - args.read = args.sign - - identity_str = args.identity - if args.import_str: - identity_bytes = None - try: - if args.base64: - identity_bytes = base64.urlsafe_b64decode(args.import_str) - elif args.base32: - identity_bytes = base64.b32decode(args.import_str) - else: - identity_bytes = bytes.fromhex(args.import_str) - except Exception as e: - print("Invalid identity data specified for import: "+str(e)) - exit(41) - - try: - identity = RNS.Identity.from_bytes(identity_bytes) - except Exception as e: - print("Could not create Reticulum identity from specified data: "+str(e)) - exit(42) - - RNS.log("Identity imported") - if args.base64: - RNS.log("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) - elif args.base32: - RNS.log("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8")) - else: - RNS.log("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False)) - if identity.prv: - if args.print_private: - if args.base64: - RNS.log("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) - elif args.base32: - RNS.log("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8")) - else: - RNS.log("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False)) - else: - RNS.log("Private Key : Hidden") - - if args.write: - try: - wp = os.path.expanduser(args.write) - if not os.path.isfile(wp) or args.force: - identity.to_file(wp) - RNS.log("Wrote imported identity to "+str(args.write)) - else: - print("File "+str(wp)+" already exists, not overwriting") - exit(43) - - except Exception as e: - print("Error while writing imported identity to file: "+str(e)) - exit(44) - - exit(0) - - if not args.generate and not identity_str: - print("\nNo identity provided, cannot continue\n") - parser.print_help() - print("") - exit(2) - - else: - targetloglevel = 4 - verbosity = args.verbose - quietness = args.quiet - if verbosity != 0 or quietness != 0: - targetloglevel = targetloglevel+verbosity-quietness - - # Start Reticulum - reticulum = RNS.Reticulum(configdir=args.config, loglevel=targetloglevel) - RNS.compact_log_fmt = True - if args.stdout: - RNS.loglevel = -1 - - if args.generate: - identity = RNS.Identity() - if not args.force and os.path.isfile(args.generate): - RNS.log("Identity file "+str(args.generate)+" already exists. Not overwriting.", RNS.LOG_ERROR) - exit(3) - else: - try: - identity.to_file(args.generate) - RNS.log(f"New identity {identity} written to {args.generate}") - exit(0) - except Exception as e: - RNS.log("An error ocurred while saving the generated Identity.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - exit(4) - - identity = None - if len(identity_str) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2 and not os.path.isfile(identity_str): - # Try recalling Identity from hex-encoded hash - try: - ident_hash = bytes.fromhex(identity_str) - identity = RNS.Identity.recall(ident_hash) or RNS.Identity.recall(ident_hash, from_identity_hash=True) - - if identity == None: - if not args.request: - RNS.log("Could not recall Identity for "+RNS.prettyhexrep(ident_hash)+".", RNS.LOG_ERROR) - RNS.log("You can query the network for unknown Identities with the -R option.", RNS.LOG_ERROR) - exit(5) - else: - RNS.Transport.request_path(ident_hash) - def spincheck(): - return RNS.Identity.recall(ident_hash) != None - spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(ident_hash), args.t) - - if not spincheck(): - RNS.log("Identity request timed out", RNS.LOG_ERROR) - exit(6) - else: - identity = RNS.Identity.recall(ident_hash) - RNS.log("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(ident_hash)+" from the network") - - else: - ident_str = str(identity) - hash_str = RNS.prettyhexrep(ident_hash) - if ident_str == hash_str: RNS.log(f"Recalled Identity {ident_str}") - else: RNS.log(f"Recalled Identity {ident_str} for destination {hash_str}") - - - except Exception as e: - RNS.log("Invalid hexadecimal hash provided", RNS.LOG_ERROR) - exit(7) - - - else: - # Try loading Identity from file - if not os.path.isfile(identity_str): - RNS.log("Specified Identity file not found") - exit(8) - else: - try: - identity = RNS.Identity.from_file(identity_str) - RNS.log("Loaded Identity "+str(identity)+" from "+str(identity_str)) - - except Exception as e: - RNS.log("Could not decode Identity from specified file") - exit(9) - - if identity != None: - if args.hash: - try: - aspects = args.hash.split(".") - if not len(aspects) > 0: - RNS.log("Invalid destination aspects specified", RNS.LOG_ERROR) - exit(32) - else: - app_name = aspects[0] - aspects = aspects[1:] - if identity.pub != None: - destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) - RNS.log("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) - RNS.log("The full destination specifier is "+str(destination)) - time.sleep(0.25) - exit(0) - else: - raise KeyError("No public key known") - except Exception as e: - RNS.log("An error ocurred while attempting to send the announce.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - - exit(0) - - if args.announce: - try: - aspects = args.announce.split(".") - if not len(aspects) > 1: - RNS.log("Invalid destination aspects specified", RNS.LOG_ERROR) - exit(32) - else: - app_name = aspects[0] - aspects = aspects[1:] - if identity.prv != None: - destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects) - RNS.log("Created destination "+str(destination)) - RNS.log("Announcing destination "+RNS.prettyhexrep(destination.hash)) - time.sleep(1.1) - destination.announce() - time.sleep(0.25) - exit(0) - else: - destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) - RNS.log("The "+str(args.announce)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) - RNS.log("The full destination specifier is "+str(destination)) - RNS.log("Cannot announce this destination, since the private key is not held") - time.sleep(0.25) - exit(33) - except Exception as e: - RNS.log("An error ocurred while attempting to send the announce.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - - exit(0) - - if args.print_identity: - if args.base64: - RNS.log("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) - elif args.base32: - RNS.log("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8")) - else: - RNS.log("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False)) - if identity.prv: - if args.print_private: - if args.base64: - RNS.log("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) - elif args.base32: - RNS.log("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8")) - else: - RNS.log("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False)) - else: - RNS.log("Private Key : Hidden") - exit(0) - - if args.export: - if identity.prv: - if args.base64: - RNS.log("Exported Identity : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) - elif args.base32: - RNS.log("Exported Identity : "+base64.b32encode(identity.get_private_key()).decode("utf-8")) - else: - RNS.log("Exported Identity : "+RNS.hexrep(identity.get_private_key(), delimit=False)) - else: - RNS.log("Identity doesn't hold a private key, cannot export") - exit(50) - - exit(0) - - if args.validate: - if not args.read and args.validate.lower().endswith("."+SIG_EXT): - args.read = str(args.validate).replace("."+SIG_EXT, "") - - if not os.path.isfile(args.validate): - RNS.log("Signature file "+str(args.read)+" not found", RNS.LOG_ERROR) - exit(10) - - if not os.path.isfile(args.read): - RNS.log("Input file "+str(args.read)+" not found", RNS.LOG_ERROR) - exit(11) - - data_input = None - if args.read: - if not os.path.isfile(args.read): - RNS.log("Input file "+str(args.read)+" not found", RNS.LOG_ERROR) - exit(12) - else: - try: - data_input = open(args.read, "rb") - except Exception as e: - RNS.log("Could not open input file for reading", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - exit(13) - - # TODO: Actually expand this to a good solution - # probably need to create a wrapper that takes - # into account not closing stdin when done - # elif args.stdin: - # data_input = sys.stdin - - data_output = None - if args.encrypt and not args.write and not args.stdout and args.read: - args.write = str(args.read)+"."+ENCRYPT_EXT - - if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT): - args.write = str(args.read).replace("."+ENCRYPT_EXT, "") - - if args.sign and identity.prv == None: - RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR) - exit(14) - - if args.sign and not args.write and not args.stdout and args.read: - args.write = str(args.read)+"."+SIG_EXT - - if args.write: - if not args.force and os.path.isfile(args.write): - RNS.log("Output file "+str(args.write)+" already exists. Not overwriting.", RNS.LOG_ERROR) - exit(15) - else: - try: - data_output = open(args.write, "wb") - except Exception as e: - RNS.log("Could not open output file for writing", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - exit(15) - - # TODO: Actually expand this to a good solution - # probably need to create a wrapper that takes - # into account not closing stdout when done - # elif args.stdout: - # data_output = sys.stdout - - if args.sign: - if identity.prv == None: - RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR) - exit(16) - - if not data_input: - if not args.stdout: - RNS.log("Signing requested, but no input data specified", RNS.LOG_ERROR) - exit(17) - else: - if not data_output: - if not args.stdout: - RNS.log("Signing requested, but no output specified", RNS.LOG_ERROR) - exit(18) - - if not args.stdout: - RNS.log("Signing "+str(args.read)) - - try: - data_output.write(identity.sign(data_input.read())) - data_output.close() - data_input.close() - - if not args.stdout: - if args.read: - RNS.log("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write)) - exit(0) - - except Exception as e: - if not args.stdout: - RNS.log("An error ocurred while encrypting data.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - try: - data_output.close() - except: - pass - try: - data_input.close() - except: - pass - exit(19) - - if args.validate: - if not data_input: - if not args.stdout: - RNS.log("Signature verification requested, but no input data specified", RNS.LOG_ERROR) - exit(20) - else: - # if not args.stdout: - # RNS.log("Verifying "+str(args.validate)+" for "+str(args.read)) - - try: - try: - sig_input = open(args.validate, "rb") - except Exception as e: - RNS.log("An error ocurred while opening "+str(args.validate)+".", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - exit(21) - - - validated = identity.validate(sig_input.read(), data_input.read()) - sig_input.close() - data_input.close() - - if not validated: - if not args.stdout: - RNS.log("Signature "+str(args.validate)+" for file "+str(args.read)+" is invalid", RNS.LOG_ERROR) - exit(22) - else: - if not args.stdout: - RNS.log("Signature "+str(args.validate)+" for file "+str(args.read)+" made by Identity "+str(identity)+" is valid") - exit(0) - - except Exception as e: - if not args.stdout: - RNS.log("An error ocurred while validating signature.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - try: - data_output.close() - except: - pass - try: - data_input.close() - except: - pass - exit(23) - - if args.encrypt: - if not data_input: - if not args.stdout: - RNS.log("Encryption requested, but no input data specified", RNS.LOG_ERROR) - exit(24) - else: - if not data_output: - if not args.stdout: - RNS.log("Encryption requested, but no output specified", RNS.LOG_ERROR) - exit(25) - - if not args.stdout: - RNS.log("Encrypting "+str(args.read)) - - try: - more_data = True - while more_data: - chunk = data_input.read(CHUNK_SIZE) - if chunk: - data_output.write(identity.encrypt(chunk)) - else: - more_data = False - data_output.close() - data_input.close() - if not args.stdout: - if args.read: - RNS.log("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write)) - exit(0) - - except Exception as e: - if not args.stdout: - RNS.log("An error ocurred while encrypting data.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - try: - data_output.close() - except: - pass - try: - data_input.close() - except: - pass - exit(26) - - if args.decrypt: - if identity.prv == None: - RNS.log("Specified Identity does not hold a private key. Cannot decrypt.", RNS.LOG_ERROR) - exit(27) - - if not data_input: - if not args.stdout: - RNS.log("Decryption requested, but no input data specified", RNS.LOG_ERROR) - exit(28) - else: - if not data_output: - if not args.stdout: - RNS.log("Decryption requested, but no output specified", RNS.LOG_ERROR) - exit(29) - - if not args.stdout: - RNS.log("Decrypting "+str(args.read)+"...") - - try: - more_data = True - while more_data: - chunk = data_input.read(CHUNK_SIZE) - if chunk: - plaintext = identity.decrypt(chunk) - if plaintext == None: - if not args.stdout: - RNS.log("Data could not be decrypted with the specified Identity") - exit(30) - else: - data_output.write(plaintext) - else: - more_data = False - data_output.close() - data_input.close() - if not args.stdout: - if args.read: - RNS.log("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write)) - exit(0) - - except Exception as e: - if not args.stdout: - RNS.log("An error ocurred while decrypting data.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - try: - data_output.close() - except: - pass - try: - data_input.close() - except: - pass - exit(31) - - if True: - pass - - elif False: - pass - - else: - print("") - parser.print_help() - print("") - - except KeyboardInterrupt: - print("") - exit(255) + if timeout != None and time.time() > timeout: return False + else: return True if __name__ == "__main__": - main() \ No newline at end of file + main() + + +############ Legacy reference + +# if identity != None: + +# data_output = None +# if args.encrypt and not args.write and not args.stdout and args.read: +# args.write = str(args.read)+"."+ENCRYPT_EXT + +# if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT): +# args.write = str(args.read).replace("."+ENCRYPT_EXT, "") + +# if args.write: +# if not args.force and os.path.isfile(args.write): +# print("Output file "+str(args.write)+" already exists. Not overwriting.") +# exit(15) +# else: +# try: +# data_output = open(args.write, "wb") +# except Exception as e: +# print("Could not open output file for writing") +# print("The contained exception was: "+str(e)) +# exit(15) + +# # TODO: Actually expand this to a good solution +# # probably need to create a wrapper that takes +# # into account not closing stdout when done +# # elif args.stdout: +# # data_output = sys.stdout + +# if args.sign: +# if identity.prv == None: +# print("Specified Identity does not hold a private key. Cannot sign.") +# exit(16) + +# if not data_input: +# if not args.stdout: +# print("Signing requested, but no input data specified") +# exit(17) +# else: +# if not data_output: +# if not args.stdout: +# print("Signing requested, but no output specified") +# exit(18) + +# if not args.stdout: +# print("Signing "+str(args.read)) + +# try: +# data_output.write(identity.sign(data_input.read())) +# data_output.close() +# data_input.close() + +# if not args.stdout: +# if args.read: +# print("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write)) +# exit(0) + +# except Exception as e: +# if not args.stdout: +# print("An error ocurred while encrypting data.") +# print("The contained exception was: "+str(e)) +# try: +# data_output.close() +# except: +# pass +# try: +# data_input.close() +# except: +# pass +# exit(19) + +# if args.encrypt: +# if not data_input: +# if not args.stdout: +# print("Encryption requested, but no input data specified") +# exit(24) +# else: +# if not data_output: +# if not args.stdout: +# print("Encryption requested, but no output specified") +# exit(25) + +# if not args.stdout: +# print("Encrypting "+str(args.read)) + +# try: +# more_data = True +# while more_data: +# chunk = data_input.read(CHUNK_SIZE) +# if chunk: +# data_output.write(identity.encrypt(chunk)) +# else: +# more_data = False +# data_output.close() +# data_input.close() +# if not args.stdout: +# if args.read: +# print("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write)) +# exit(0) + +# except Exception as e: +# if not args.stdout: +# print("An error ocurred while encrypting data.") +# print("The contained exception was: "+str(e)) +# try: +# data_output.close() +# except: +# pass +# try: +# data_input.close() +# except: +# pass +# exit(26) + +# if args.decrypt: +# if identity.prv == None: +# print("Specified Identity does not hold a private key. Cannot decrypt.") +# exit(27) + +# if not data_input: +# if not args.stdout: +# print("Decryption requested, but no input data specified") +# exit(28) +# else: +# if not data_output: +# if not args.stdout: +# print("Decryption requested, but no output specified") +# exit(29) + +# if not args.stdout: +# print("Decrypting "+str(args.read)+"...") + +# try: +# more_data = True +# while more_data: +# chunk = data_input.read(CHUNK_SIZE) +# if chunk: +# plaintext = identity.decrypt(chunk) +# if plaintext == None: +# if not args.stdout: +# print("Data could not be decrypted with the specified Identity") +# exit(30) +# else: +# data_output.write(plaintext) +# else: +# more_data = False +# data_output.close() +# data_input.close() +# if not args.stdout: +# if args.read: +# print("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write)) +# exit(0) + +# except Exception as e: +# if not args.stdout: +# print("An error ocurred while decrypting data.") +# print("The contained exception was: "+str(e)) +# try: data_output.close() +# except: pass +# try: data_input.close() +# except: pass +# exit(31) \ No newline at end of file