From 9179b914d5bd4bd022acfd22d529917c978a8726 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 13 May 2026 01:14:41 +0200 Subject: [PATCH] Added embedded message signing, validation and viewing to rnid --- RNS/Utilities/rnid.py | 123 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 119 insertions(+), 4 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index 53667174..571ae065 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -45,10 +45,12 @@ from RNS.Cryptography.Hashes import file_sha256 APP_NAME = "rns" DEFAULT_ASPECTS = f"{APP_NAME}.id" +NO_MESSAGE = 0x01 PRV_EXT = "rid" PUB_EXT = "pub" SIG_EXT = "rsg" +MSG_EXT = "rsm" ENCRYPT_EXT = "rfe" CHUNK_BLOCKS = 1024*1024 ENC_CHUNK = CHUNK_BLOCKS*RNS.Identity.AES256_BLOCKSIZE @@ -69,6 +71,7 @@ R_INVALID_ASPECTS = 9 R_INVALID_SIGNATURE = 10 R_FILE_EXISTS = 11 R_DECRYPT_FAILED = 12 +R_INVALID_ARGS = 250 R_SEQUENCE_ERROR = 251 R_READ_ERROR = 252 R_WRITE_ERROR = 253 @@ -79,7 +82,7 @@ reticulum = None def validate_args(args): ops = 0; - for o in [args.encrypt, args.decrypt, args.validate, args.sign]: + for o in [args.encrypt, args.decrypt, args.validate, args.sign, args.sign_message]: if o: ops += 1 if ops > 1: print("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation"); exit(1) @@ -89,7 +92,7 @@ def validate_args(args): if g > 1: print("The -i, -g, -m and -M args are mutually exclusive"); exit(1) g = 0; - for a in [args.base64, args.base32, args.hex]: + for a in [args.base64, args.base32, args.base256, args.hex]: if a: g += 1 if g > 1: print("The -b, -B, --hex and --base256 args are mutually exclusive"); exit(1) @@ -119,6 +122,7 @@ def main(): parser.add_argument("-e", "--encrypt", metavar="file", action="store", nargs="*", default=None, help="encrypt file") parser.add_argument("-V", "--validate", metavar="path", action="store", nargs="*", default=None, help="validate signature") parser.add_argument("-s", "--sign", metavar="path", action="store", nargs="*", default=None, help="sign file") + parser.add_argument("-S", "--sign-message", metavar="path", action="store", nargs="?", const=NO_MESSAGE, default=None, help="create embedded signed message") parser.add_argument("--raw", action="store_true", default=False, help="sign raw input data instead of hashing first") # I/O Control @@ -157,6 +161,7 @@ def main(): if args.announce: announce(args, identity); op = True if args.validate: validate(args, identity or args.identity); op = True if args.sign: sign(args, identity); op = True + if args.sign_message: sign_message(args, identity); op = True if args.encrypt: encrypt(args, identity); op = True if args.decrypt: decrypt(args, identity); op = True if args.write: write_identity(args, identity); op = True @@ -398,6 +403,14 @@ def get_rsg_data(rsg): return rsg_data +def extract_signed_rsg_data(rsg): + siglen = RNS.Identity.SIGLENGTH//8 + rsg_data = get_rsg_data(rsg) + envelope = rsg_data[siglen:] + + try: return mp.unpackb(envelope) + except: return None + def get_rsg_hash(message): sha = None if type(message) == bytes: sha = sha256(message) @@ -466,7 +479,7 @@ def validate_rsg(rsg, message=None, required_signer=None): return False, signed_data, signing_identity -def create_rsg(signer_identity, message, note=None, meta=None, output="bin"): +def create_rsg(signer_identity, message, embed=False, note=None, meta=None, output="bin"): if not output in ["bin", "hex", "base32", "base256", "base64"]: raise TypeError(f"Invalid output format for rsg creation") if not type(signer_identity) == RNS.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity") if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key") @@ -476,6 +489,10 @@ def create_rsg(signer_identity, message, note=None, meta=None, output="bin"): "pubkey": signer_identity.get_public_key(), "note" : note } } + if embed: + if type(message) == str: message = message.encode("utf-8") + signed_data["message"] = message + if meta and type(meta) == dict: for key in meta: if not key in signed_data["meta"]: signed_data["meta"]["key"] = meta["key"] @@ -559,14 +576,17 @@ def validate(args, identity, __recursive=False): if len(paths) != validated: print(f"Sequence error on recursive signature validation"); exit(R_SEQUENCE_ERROR) else: exit(R_OK) + msg_ext = f".{MSG_EXT}" sig_ext = f".{SIG_EXT}" validate_path = os.path.expanduser(args.validate) + path_is_msgfile = validate_path.lower().endswith(msg_ext) path_is_sigfile = validate_path.lower().endswith(sig_ext) if path_is_sigfile: signature_path = validate_path; file_path = validate_path[:-len(sig_ext)] else: signature_path = f"{validate_path}{sig_ext}"; file_path = validate_path signature_exists = os.path.isfile(signature_path) file_exists = os.path.isfile(file_path) + if path_is_msgfile: return validate_message(args, identity, __recursive=__recursive) if not file_exists: print(f"The validation target \"{file_path}\" does not exist"); exit(R_NO_FILE) if not signature_exists: print(f"No signature file exists for \"{file_path}\""); exit(R_NO_FILE) @@ -610,6 +630,39 @@ def validate(args, identity, __recursive=False): except Exception as e: print(f"Could not validate signature: {e}"); exit(R_READ_ERROR) +def validate_message(args, identity, __recursive=False): + msg_ext = f".{MSG_EXT}" + validate_path = os.path.expanduser(args.validate) + path_is_msgfile = validate_path.lower().endswith(msg_ext) + if path_is_msgfile: signature_path = validate_path + signature_exists = os.path.isfile(signature_path) + + if not signature_exists: print(f"The signature file \"{signature_path}\" does not exist"); exit(R_NO_FILE) + + try: + with open(signature_path, "rb") as fh: rsg = fh.read() + except Exception as e: print(f"Could not read rsg: {e}"); exit(R_READ_ERROR) + + if type(identity) == str: + if not len(identity) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2: print("Invalid identity hash length"); exit(R_INVALID_IDENTITY) + try: identity = bytes.fromhex(identity) + except Exception as e: print(f"Invalid identity hash: {e}"); exit(R_INVALID_IDENTITY) + + try: + rsm_contents = extract_signed_rsg_data(rsg) + if not "message" in rsm_contents: print(f"No embedded message in {signature_path}"); exit(R_INVALID_SIGNATURE) + valid, signed_data, signing_identity = validate_rsg(rsg, message=rsm_contents["message"], required_signer=identity) + identity_str = RNS.prettyhexrep(identity) if type(identity) == bytes else f"{identity}" + signer_description = f"\nThe message was NOT signed by {identity_str or signing_identity}" if identity else "" + if not valid: print(f"Invalid signature in {signature_path}{signer_description}"); exit(R_INVALID_SIGNATURE) + else: + print(f"\nSignature is valid, the following message was signed by {signing_identity}:\n") + print(signed_data["message"].decode("utf-8")) + + return exit(R_OK) if not __recursive else R_OK + + except Exception as e: print(f"Error while validating {signature_path}: {e}"); exit(R_UNKNOWN_ERROR) + def sign(args, identity, __recursive=False): if type(args.sign) == list: paths = args.sign.copy() @@ -653,12 +706,47 @@ def sign(args, identity, __recursive=False): with open(rsg_path, "wb") as out_file: out_file.write(rsg) elif output in ["base32", "base64", "base256", "hex"]: print(f"\n{wrap_rsg(rsg)}\n") - else: print("No valid output format specified") + else: print("No valid output format specified"); exit(R_INVALID_ARGS) print(f"Signed file {sign_path} with {identity}"); return exit(R_OK) if not __recursive else R_OK except Exception as e: print(f"Could not sign {sign_path}: {e}"); exit(R_UNKNOWN_ERROR) +def sign_message(args, identity): + message = args.sign_message + + if args.base32: output = "base32" + elif args.base64: output = "base64" + elif args.base256: output = "base256" + elif args.hex: output = "hex" + else: output = "bin" + + if output == "bin" and not args.write: print("No write path specified"); exit(R_INVALID_ARGS) + if not identity.get_private_key(): print(f"Cannot sign \"{sign_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY) + + if message == NO_MESSAGE: message = get_editor_content() + if not message: print("No message specified"); exit(R_INVALID_ARGS) + + try: + rsg = create_rsg(identity, message, embed=True, output=output) + if not rsg: print(f"No signature created, not writing"); exit(R_UNKNOWN_ERROR) + + if output == "bin": + sig_ext = f".{MSG_EXT}" + rsg_path = os.path.expanduser(args.write) + rsg_path = f"{rsg_path}{sig_ext}" if not rsg_path.endswith(sig_ext) else rsg_path + signature_exists = os.path.isfile(rsg_path) + if signature_exists and not args.force: print(f"The signature file \"{rsg_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS) + with open(rsg_path, "wb") as out_file: out_file.write(rsg) + print(f"Message signed with {identity} saved to {rsg_path}"); exit(R_OK) + + elif output in ["base32", "base64", "base256", "hex"]: print(f"\n{wrap_rsg(rsg)}\n") + else: print("No valid output format specified"); exit(R_INVALID_ARGS) + + print(f"Message signed with {identity}"); exit(R_OK) + + except Exception as e: print(f"Could not sign message: {e}"); exit(R_UNKNOWN_ERROR) + ###################################### # Encryption & Decryption Operations # @@ -851,6 +939,33 @@ def export_prv_identity(args, identity): # Helper & Utility Functions # ############################## +def get_editor_content(): + import subprocess + from tempfile import NamedTemporaryFile + template = "" + editor = os.environ.get("EDITOR", "") + if not editor: + for fallback in ["nano", "vim", "vi"]: + try: + subprocess.run(["which", fallback], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + editor = fallback + break + except subprocess.CalledProcessError: continue + + if not editor: print("Could not launch editor"); exit(R_READ_ERROR); + try: + with NamedTemporaryFile(mode="w+", suffix=".tmp", delete=False) as tmp: + tmp_path = tmp.name + tmp.write(template) + + result = subprocess.run([editor, tmp_path]) + if result.returncode != 0: print(f"Editor exited with error code {result.returncode}"); os.unlink(tmp_path); exit(R_READ_ERROR) + with open(tmp_path, "r") as f: content = f.read() + os.unlink(tmp_path) + return content.encode("utf-8") + + except Exception as e: print(f"Could not get content from editor: {e}"); exit(R_READ_ERROR) + def spin(until=None, msg=None, timeout=None): i = 0 syms = "⢄⢂⢁⡁⡈⡐⡠"