From 83d9ee1c5f6025722feb18a312acaeace7a65fd9 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 7 May 2026 17:19:46 +0200 Subject: [PATCH] Refactored rnid --- RNS/Identity.py | 1 + RNS/Utilities/rnid.py | 297 +++++++++++++++--------------------------- 2 files changed, 105 insertions(+), 193 deletions(-) diff --git a/RNS/Identity.py b/RNS/Identity.py index 7dded286..aae60807 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -76,6 +76,7 @@ class Identity: # Non-configurable constants TOKEN_OVERHEAD = RNS.Cryptography.Token.TOKEN_OVERHEAD AES128_BLOCKSIZE = 16 # In bytes + AES256_BLOCKSIZE = 16 # In bytes HASHLENGTH = 256 # In bits SIGLENGTH = KEYSIZE # In bits diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index bd2e296c..d59d386a 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -46,29 +46,33 @@ from RNS.Cryptography.Hashes import file_sha256 APP_NAME = "rns" DEFAULT_ASPECTS = f"{APP_NAME}.id" -PRV_EXT = "rid" -PUB_EXT = "pub" -SIG_EXT = "rsg" -ENCRYPT_EXT = "rfe" -CHUNK_SIZE = 16*1024*1024 +PRV_EXT = "rid" +PUB_EXT = "pub" +SIG_EXT = "rsg" +ENCRYPT_EXT = "rfe" +CHUNK_BLOCKS = 1024*1024 +ENC_CHUNK = CHUNK_BLOCKS*RNS.Identity.AES256_BLOCKSIZE +DEC_CHUNK = ENC_CHUNK + RNS.Cryptography.Token.TOKEN_OVERHEAD*2 RSG_HASHTYPES = ["sha256"] -R_OK = 0 -R_FILE_EXISTS = 1 -R_NO_FILE = 1 -R_NO_SIG_FILE = 1 -R_NO_IDENTITY = 1 -R_NO_PUBKEY = 2 -R_NO_PRVKEY = 3 -R_NO_KEYS = 4 -R_WRITE_ERROR = 5 -R_INVALID_ASPECTS = 4 -R_INVALID_SIGNATURE = 128 -R_READ_ERROR = 252 -R_WRITE_ERROR = 253 -R_UNKNOWN_ERROR = 254 -R_INTERRUPTED = 255 +R_OK = 0 +R_NO_SIG_FILE = 1 +R_NO_IDENTITY = 2 +R_NO_PUBKEY = 3 +R_NO_PRVKEY = 4 +R_NO_KEYS = 5 +R_NO_FILE = 6 +R_INVALID_FILE = 7 +R_INVALID_IDENTITY = 8 +R_INVALID_ASPECTS = 9 +R_INVALID_SIGNATURE = 10 +R_FILE_EXISTS = 11 +R_DECRYPT_FAILED = 12 +R_READ_ERROR = 252 +R_WRITE_ERROR = 253 +R_UNKNOWN_ERROR = 254 +R_INTERRUPTED = 255 reticulum = None @@ -117,11 +121,10 @@ def main(): parser.add_argument("--raw", action="store_true", default=False, help="sign raw input data instead of hashing first") # I/O Control - parser.add_argument("-r", "--read", metavar="file", action="store", default=None, help="input file path", type=str) parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str) parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files") - parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file" - parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file", + parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # help="read input from STDIN instead of file" + parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file" # Information Flow parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network") @@ -147,12 +150,12 @@ def main(): if args.export_pub: export_pub_identity(args, identity) if args.export_prv: export_prv_identity(args, identity) if args.hash: print_hash_information(args, identity) - if args.write: write_identity(args, identity) if args.announce: announce(args, identity) if args.validate: validate(args, identity) if args.sign: sign(args, identity) if args.encrypt: encrypt(args, identity) if args.decrypt: decrypt(args, identity) + if args.write: write_identity(args, identity) exit(0) @@ -238,7 +241,7 @@ def get_operating_identity(args): else: print(f"Recalled Identity {ident_str} for destination {hash_str}") - except Exception as e: print("Invalid hexadecimal hash provided"); exit(7) + except Exception as e: print(f"Invalid hexadecimal hash provided: {e}"); exit(R_INVALID_IDENTITY) elif args.import_pub or args.import_prv: prvsize = RNS.Identity.KEYSIZE//8 @@ -367,9 +370,9 @@ def announce(args, identity): except Exception as e: print(f"An error ocurred while attempting to send the announce: {e}"); exit(R_UNKNOWN_ERROR) -############################ -# Cryptographic Operations # -############################ +################################### +# Signing & Validation Operations # +################################### def get_rsg_data(rsg): rsg_data = None @@ -536,11 +539,81 @@ def sign(args, identity): except Exception as e: print(f"Could not sign {sign_path}: {e}"); exit(R_UNKNOWN_ERROR) + +###################################### +# Encryption & Decryption Operations # +###################################### + def encrypt(args, identity): - pass + enc_ext = f".{ENCRYPT_EXT}" + encrypt_path = os.path.expanduser(args.encrypt) + rfe_path = args.write if args.write else f"{encrypt_path}{enc_ext}" + file_exists = os.path.isfile(encrypt_path) + rfe_exists = os.path.isfile(rfe_path) + + if not identity: print(f"Cannot encrypt \"{encrypt_path}\", no identity specified"); exit(R_NO_IDENTITY) + if not identity.get_public_key(): print(f"Cannot encrypt \"{encrypt_path}\", the identity does not hold a public key"); exit(R_NO_PUBKEY) + if not file_exists: print(f"The file \"{encrypt_path}\" does not exist"); exit(R_NO_FILE) + + if rfe_exists and not args.force: + print(f"The encryption output file \"{rfe_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS) + + try: + with open(encrypt_path, "rb") as input_fh: + try: + with open(rfe_path, "wb") as output_fh: + wrote = 0 + data_remaining = True + while data_remaining: + chunk = input_fh.read(ENC_CHUNK) + if chunk: wrote += output_fh.write(identity.encrypt(chunk)) + else: data_remaining = False + print(f"\rWrote {RNS.prettysize(wrote)} to {rfe_path} ", end="") + + except Exception as e: print(f"\nError writing encrypted output to {rfe_path}: {e}"); exit(R_WRITE_ERROR) + except Exception as e: print(f"\nError reading {encrypt_path} for encryption: {e}"); exit(R_WRITE_ERROR) + + print(f"\nFile {encrypt_path} encrypted for {identity} to {rfe_path}"); exit(R_OK) def decrypt(args, identity): - pass + enc_ext = f".{ENCRYPT_EXT}" + rfe_path = os.path.expanduser(args.decrypt) + if not rfe_path.endswith(enc_ext): print(f"The file {rfe_path} does not appear to be a Reticulum encrypted file"); exit(R_INVALID_FILE) + + decrypt_path = os.path.expanduser(args.write) if args.write else f"{rfe_path[:-len(enc_ext)]}" + if not decrypt_path: print(f"Invalid output filename"); exit(R_INVALID_FILE) + + rfe_exists = os.path.isfile(rfe_path) + file_exists = os.path.isfile(decrypt_path) + + if not identity: print(f"Cannot decrypt \"{rfe_path}\", no identity specified"); exit(R_NO_IDENTITY) + if not identity.get_private_key(): print(f"Cannot decrypt \"{rfe_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY) + if not rfe_exists: print(f"The file \"{rfe_path}\" does not exist"); exit(R_NO_FILE) + + if file_exists and not args.force: + print(f"The decryption output file \"{decrypt_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS) + + try: + with open(rfe_path, "rb") as input_fh: + try: + with open(decrypt_path, "wb") as output_fh: + data_remaining = True + wrote = 0 + while data_remaining: + chunk = input_fh.read(DEC_CHUNK) + if chunk: + decrypted = identity.decrypt(chunk) + if not decrypted: print(f"The provided identity could not decrypt the file"); exit(R_DECRYPT_FAILED) + else: wrote += output_fh.write(decrypted) + print(f"\rWrote {RNS.prettysize(wrote)} to {decrypt_path} ", end="") + + else: data_remaining = False + + except Exception as e: print(f"\nError writing decrypted output to {decrypt_path}: {e}"); exit(R_WRITE_ERROR) + except Exception as e: print(f"\nError reading {rfe_path} for decryption: {e}"); exit(R_WRITE_ERROR) + + print(f"\nFile {rfe_path} decrypted to {decrypt_path}"); exit(R_OK) + ################ # File Output # @@ -566,6 +639,7 @@ def write_identity(args, identity): else: print("Identity holds neither a public nor private key"); exit(R_NO_KEYS) except Exception as e: print("Error while writing imported identity to file: "+str(e)); exit(R_WRITE_ERROR) + ################### # Terminal Output # ################### @@ -637,166 +711,3 @@ def spin(until=None, msg=None, timeout=None): if __name__ == "__main__": main() - - -############ Legacy reference -# if identity != None: - -# data_output = None -# if args.encrypt and not args.write and not args.stdout and args.read: -# args.write = str(args.read)+"."+ENCRYPT_EXT - -# if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT): -# args.write = str(args.read).replace("."+ENCRYPT_EXT, "") - -# if args.write: -# if not args.force and os.path.isfile(args.write): -# print("Output file "+str(args.write)+" already exists. Not overwriting.") -# exit(15) -# else: -# try: -# data_output = open(args.write, "wb") -# except Exception as e: -# print("Could not open output file for writing") -# print("The contained exception was: "+str(e)) -# exit(15) - -# # TODO: Actually expand this to a good solution -# # probably need to create a wrapper that takes -# # into account not closing stdout when done -# # elif args.stdout: -# # data_output = sys.stdout - -# if args.sign: -# if identity.prv == None: -# print("Specified Identity does not hold a private key. Cannot sign.") -# exit(16) - -# if not data_input: -# if not args.stdout: -# print("Signing requested, but no input data specified") -# exit(17) -# else: -# if not data_output: -# if not args.stdout: -# print("Signing requested, but no output specified") -# exit(18) - -# if not args.stdout: -# print("Signing "+str(args.read)) - -# try: -# data_output.write(identity.sign(data_input.read())) -# data_output.close() -# data_input.close() - -# if not args.stdout: -# if args.read: -# print("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write)) -# exit(0) - -# except Exception as e: -# if not args.stdout: -# print("An error ocurred while encrypting data.") -# print("The contained exception was: "+str(e)) -# try: -# data_output.close() -# except: -# pass -# try: -# data_input.close() -# except: -# pass -# exit(19) - -# if args.encrypt: -# if not data_input: -# if not args.stdout: -# print("Encryption requested, but no input data specified") -# exit(24) -# else: -# if not data_output: -# if not args.stdout: -# print("Encryption requested, but no output specified") -# exit(25) - -# if not args.stdout: -# print("Encrypting "+str(args.read)) - -# try: -# more_data = True -# while more_data: -# chunk = data_input.read(CHUNK_SIZE) -# if chunk: -# data_output.write(identity.encrypt(chunk)) -# else: -# more_data = False -# data_output.close() -# data_input.close() -# if not args.stdout: -# if args.read: -# print("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write)) -# exit(0) - -# except Exception as e: -# if not args.stdout: -# print("An error ocurred while encrypting data.") -# print("The contained exception was: "+str(e)) -# try: -# data_output.close() -# except: -# pass -# try: -# data_input.close() -# except: -# pass -# exit(26) - -# if args.decrypt: -# if identity.prv == None: -# print("Specified Identity does not hold a private key. Cannot decrypt.") -# exit(27) - -# if not data_input: -# if not args.stdout: -# print("Decryption requested, but no input data specified") -# exit(28) -# else: -# if not data_output: -# if not args.stdout: -# print("Decryption requested, but no output specified") -# exit(29) - -# if not args.stdout: -# print("Decrypting "+str(args.read)+"...") - -# try: -# more_data = True -# while more_data: -# chunk = data_input.read(CHUNK_SIZE) -# if chunk: -# plaintext = identity.decrypt(chunk) -# if plaintext == None: -# if not args.stdout: -# print("Data could not be decrypted with the specified Identity") -# exit(30) -# else: -# data_output.write(plaintext) -# else: -# more_data = False -# data_output.close() -# data_input.close() -# if not args.stdout: -# if args.read: -# print("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write)) -# exit(0) - -# except Exception as e: -# if not args.stdout: -# print("An error ocurred while decrypting data.") -# print("The contained exception was: "+str(e)) -# try: data_output.close() -# except: pass -# try: data_input.close() -# except: pass -# exit(31) \ No newline at end of file