Refactored rnid

This commit is contained in:
Mark Qvist
2026-05-07 17:19:46 +02:00
parent b527c59735
commit 83d9ee1c5f
2 changed files with 105 additions and 193 deletions
+1
View File
@@ -76,6 +76,7 @@ class Identity:
# Non-configurable constants
TOKEN_OVERHEAD = RNS.Cryptography.Token.TOKEN_OVERHEAD
AES128_BLOCKSIZE = 16 # In bytes
AES256_BLOCKSIZE = 16 # In bytes
HASHLENGTH = 256 # In bits
SIGLENGTH = KEYSIZE # In bits
+104 -193
View File
@@ -46,29 +46,33 @@ from RNS.Cryptography.Hashes import file_sha256
APP_NAME = "rns"
DEFAULT_ASPECTS = f"{APP_NAME}.id"
PRV_EXT = "rid"
PUB_EXT = "pub"
SIG_EXT = "rsg"
ENCRYPT_EXT = "rfe"
CHUNK_SIZE = 16*1024*1024
PRV_EXT = "rid"
PUB_EXT = "pub"
SIG_EXT = "rsg"
ENCRYPT_EXT = "rfe"
CHUNK_BLOCKS = 1024*1024
ENC_CHUNK = CHUNK_BLOCKS*RNS.Identity.AES256_BLOCKSIZE
DEC_CHUNK = ENC_CHUNK + RNS.Cryptography.Token.TOKEN_OVERHEAD*2
RSG_HASHTYPES = ["sha256"]
R_OK = 0
R_FILE_EXISTS = 1
R_NO_FILE = 1
R_NO_SIG_FILE = 1
R_NO_IDENTITY = 1
R_NO_PUBKEY = 2
R_NO_PRVKEY = 3
R_NO_KEYS = 4
R_WRITE_ERROR = 5
R_INVALID_ASPECTS = 4
R_INVALID_SIGNATURE = 128
R_READ_ERROR = 252
R_WRITE_ERROR = 253
R_UNKNOWN_ERROR = 254
R_INTERRUPTED = 255
R_OK = 0
R_NO_SIG_FILE = 1
R_NO_IDENTITY = 2
R_NO_PUBKEY = 3
R_NO_PRVKEY = 4
R_NO_KEYS = 5
R_NO_FILE = 6
R_INVALID_FILE = 7
R_INVALID_IDENTITY = 8
R_INVALID_ASPECTS = 9
R_INVALID_SIGNATURE = 10
R_FILE_EXISTS = 11
R_DECRYPT_FAILED = 12
R_READ_ERROR = 252
R_WRITE_ERROR = 253
R_UNKNOWN_ERROR = 254
R_INTERRUPTED = 255
reticulum = None
@@ -117,11 +121,10 @@ def main():
parser.add_argument("--raw", action="store_true", default=False, help="sign raw input data instead of hashing first")
# I/O Control
parser.add_argument("-r", "--read", metavar="file", action="store", default=None, help="input file path", type=str)
parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str)
parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files")
parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file"
parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file",
parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # help="read input from STDIN instead of file"
parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file"
# Information Flow
parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network")
@@ -147,12 +150,12 @@ def main():
if args.export_pub: export_pub_identity(args, identity)
if args.export_prv: export_prv_identity(args, identity)
if args.hash: print_hash_information(args, identity)
if args.write: write_identity(args, identity)
if args.announce: announce(args, identity)
if args.validate: validate(args, identity)
if args.sign: sign(args, identity)
if args.encrypt: encrypt(args, identity)
if args.decrypt: decrypt(args, identity)
if args.write: write_identity(args, identity)
exit(0)
@@ -238,7 +241,7 @@ def get_operating_identity(args):
else: print(f"Recalled Identity {ident_str} for destination {hash_str}")
except Exception as e: print("Invalid hexadecimal hash provided"); exit(7)
except Exception as e: print(f"Invalid hexadecimal hash provided: {e}"); exit(R_INVALID_IDENTITY)
elif args.import_pub or args.import_prv:
prvsize = RNS.Identity.KEYSIZE//8
@@ -367,9 +370,9 @@ def announce(args, identity):
except Exception as e: print(f"An error ocurred while attempting to send the announce: {e}"); exit(R_UNKNOWN_ERROR)
############################
# Cryptographic Operations #
############################
###################################
# Signing & Validation Operations #
###################################
def get_rsg_data(rsg):
rsg_data = None
@@ -536,11 +539,81 @@ def sign(args, identity):
except Exception as e: print(f"Could not sign {sign_path}: {e}"); exit(R_UNKNOWN_ERROR)
######################################
# Encryption & Decryption Operations #
######################################
def encrypt(args, identity):
pass
enc_ext = f".{ENCRYPT_EXT}"
encrypt_path = os.path.expanduser(args.encrypt)
rfe_path = args.write if args.write else f"{encrypt_path}{enc_ext}"
file_exists = os.path.isfile(encrypt_path)
rfe_exists = os.path.isfile(rfe_path)
if not identity: print(f"Cannot encrypt \"{encrypt_path}\", no identity specified"); exit(R_NO_IDENTITY)
if not identity.get_public_key(): print(f"Cannot encrypt \"{encrypt_path}\", the identity does not hold a public key"); exit(R_NO_PUBKEY)
if not file_exists: print(f"The file \"{encrypt_path}\" does not exist"); exit(R_NO_FILE)
if rfe_exists and not args.force:
print(f"The encryption output file \"{rfe_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS)
try:
with open(encrypt_path, "rb") as input_fh:
try:
with open(rfe_path, "wb") as output_fh:
wrote = 0
data_remaining = True
while data_remaining:
chunk = input_fh.read(ENC_CHUNK)
if chunk: wrote += output_fh.write(identity.encrypt(chunk))
else: data_remaining = False
print(f"\rWrote {RNS.prettysize(wrote)} to {rfe_path} ", end="")
except Exception as e: print(f"\nError writing encrypted output to {rfe_path}: {e}"); exit(R_WRITE_ERROR)
except Exception as e: print(f"\nError reading {encrypt_path} for encryption: {e}"); exit(R_WRITE_ERROR)
print(f"\nFile {encrypt_path} encrypted for {identity} to {rfe_path}"); exit(R_OK)
def decrypt(args, identity):
pass
enc_ext = f".{ENCRYPT_EXT}"
rfe_path = os.path.expanduser(args.decrypt)
if not rfe_path.endswith(enc_ext): print(f"The file {rfe_path} does not appear to be a Reticulum encrypted file"); exit(R_INVALID_FILE)
decrypt_path = os.path.expanduser(args.write) if args.write else f"{rfe_path[:-len(enc_ext)]}"
if not decrypt_path: print(f"Invalid output filename"); exit(R_INVALID_FILE)
rfe_exists = os.path.isfile(rfe_path)
file_exists = os.path.isfile(decrypt_path)
if not identity: print(f"Cannot decrypt \"{rfe_path}\", no identity specified"); exit(R_NO_IDENTITY)
if not identity.get_private_key(): print(f"Cannot decrypt \"{rfe_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY)
if not rfe_exists: print(f"The file \"{rfe_path}\" does not exist"); exit(R_NO_FILE)
if file_exists and not args.force:
print(f"The decryption output file \"{decrypt_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS)
try:
with open(rfe_path, "rb") as input_fh:
try:
with open(decrypt_path, "wb") as output_fh:
data_remaining = True
wrote = 0
while data_remaining:
chunk = input_fh.read(DEC_CHUNK)
if chunk:
decrypted = identity.decrypt(chunk)
if not decrypted: print(f"The provided identity could not decrypt the file"); exit(R_DECRYPT_FAILED)
else: wrote += output_fh.write(decrypted)
print(f"\rWrote {RNS.prettysize(wrote)} to {decrypt_path} ", end="")
else: data_remaining = False
except Exception as e: print(f"\nError writing decrypted output to {decrypt_path}: {e}"); exit(R_WRITE_ERROR)
except Exception as e: print(f"\nError reading {rfe_path} for decryption: {e}"); exit(R_WRITE_ERROR)
print(f"\nFile {rfe_path} decrypted to {decrypt_path}"); exit(R_OK)
################
# File Output #
@@ -566,6 +639,7 @@ def write_identity(args, identity):
else: print("Identity holds neither a public nor private key"); exit(R_NO_KEYS)
except Exception as e: print("Error while writing imported identity to file: "+str(e)); exit(R_WRITE_ERROR)
###################
# Terminal Output #
###################
@@ -637,166 +711,3 @@ def spin(until=None, msg=None, timeout=None):
if __name__ == "__main__":
main()
############ Legacy reference
# if identity != None:
# data_output = None
# if args.encrypt and not args.write and not args.stdout and args.read:
# args.write = str(args.read)+"."+ENCRYPT_EXT
# if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT):
# args.write = str(args.read).replace("."+ENCRYPT_EXT, "")
# if args.write:
# if not args.force and os.path.isfile(args.write):
# print("Output file "+str(args.write)+" already exists. Not overwriting.")
# exit(15)
# else:
# try:
# data_output = open(args.write, "wb")
# except Exception as e:
# print("Could not open output file for writing")
# print("The contained exception was: "+str(e))
# exit(15)
# # TODO: Actually expand this to a good solution
# # probably need to create a wrapper that takes
# # into account not closing stdout when done
# # elif args.stdout:
# # data_output = sys.stdout
# if args.sign:
# if identity.prv == None:
# print("Specified Identity does not hold a private key. Cannot sign.")
# exit(16)
# if not data_input:
# if not args.stdout:
# print("Signing requested, but no input data specified")
# exit(17)
# else:
# if not data_output:
# if not args.stdout:
# print("Signing requested, but no output specified")
# exit(18)
# if not args.stdout:
# print("Signing "+str(args.read))
# try:
# data_output.write(identity.sign(data_input.read()))
# data_output.close()
# data_input.close()
# if not args.stdout:
# if args.read:
# print("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write))
# exit(0)
# except Exception as e:
# if not args.stdout:
# print("An error ocurred while encrypting data.")
# print("The contained exception was: "+str(e))
# try:
# data_output.close()
# except:
# pass
# try:
# data_input.close()
# except:
# pass
# exit(19)
# if args.encrypt:
# if not data_input:
# if not args.stdout:
# print("Encryption requested, but no input data specified")
# exit(24)
# else:
# if not data_output:
# if not args.stdout:
# print("Encryption requested, but no output specified")
# exit(25)
# if not args.stdout:
# print("Encrypting "+str(args.read))
# try:
# more_data = True
# while more_data:
# chunk = data_input.read(CHUNK_SIZE)
# if chunk:
# data_output.write(identity.encrypt(chunk))
# else:
# more_data = False
# data_output.close()
# data_input.close()
# if not args.stdout:
# if args.read:
# print("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write))
# exit(0)
# except Exception as e:
# if not args.stdout:
# print("An error ocurred while encrypting data.")
# print("The contained exception was: "+str(e))
# try:
# data_output.close()
# except:
# pass
# try:
# data_input.close()
# except:
# pass
# exit(26)
# if args.decrypt:
# if identity.prv == None:
# print("Specified Identity does not hold a private key. Cannot decrypt.")
# exit(27)
# if not data_input:
# if not args.stdout:
# print("Decryption requested, but no input data specified")
# exit(28)
# else:
# if not data_output:
# if not args.stdout:
# print("Decryption requested, but no output specified")
# exit(29)
# if not args.stdout:
# print("Decrypting "+str(args.read)+"...")
# try:
# more_data = True
# while more_data:
# chunk = data_input.read(CHUNK_SIZE)
# if chunk:
# plaintext = identity.decrypt(chunk)
# if plaintext == None:
# if not args.stdout:
# print("Data could not be decrypted with the specified Identity")
# exit(30)
# else:
# data_output.write(plaintext)
# else:
# more_data = False
# data_output.close()
# data_input.close()
# if not args.stdout:
# if args.read:
# print("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write))
# exit(0)
# except Exception as e:
# if not args.stdout:
# print("An error ocurred while decrypting data.")
# print("The contained exception was: "+str(e))
# try: data_output.close()
# except: pass
# try: data_input.close()
# except: pass
# exit(31)