Added embedded message signing, validation and viewing to rnid

This commit is contained in:
Mark Qvist
2026-05-13 01:14:41 +02:00
parent eb5d46b20b
commit 9179b914d5
+119 -4
View File
@@ -45,10 +45,12 @@ from RNS.Cryptography.Hashes import file_sha256
APP_NAME = "rns"
DEFAULT_ASPECTS = f"{APP_NAME}.id"
NO_MESSAGE = 0x01
PRV_EXT = "rid"
PUB_EXT = "pub"
SIG_EXT = "rsg"
MSG_EXT = "rsm"
ENCRYPT_EXT = "rfe"
CHUNK_BLOCKS = 1024*1024
ENC_CHUNK = CHUNK_BLOCKS*RNS.Identity.AES256_BLOCKSIZE
@@ -69,6 +71,7 @@ R_INVALID_ASPECTS = 9
R_INVALID_SIGNATURE = 10
R_FILE_EXISTS = 11
R_DECRYPT_FAILED = 12
R_INVALID_ARGS = 250
R_SEQUENCE_ERROR = 251
R_READ_ERROR = 252
R_WRITE_ERROR = 253
@@ -79,7 +82,7 @@ reticulum = None
def validate_args(args):
ops = 0;
for o in [args.encrypt, args.decrypt, args.validate, args.sign]:
for o in [args.encrypt, args.decrypt, args.validate, args.sign, args.sign_message]:
if o: ops += 1
if ops > 1: print("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation"); exit(1)
@@ -89,7 +92,7 @@ def validate_args(args):
if g > 1: print("The -i, -g, -m and -M args are mutually exclusive"); exit(1)
g = 0;
for a in [args.base64, args.base32, args.hex]:
for a in [args.base64, args.base32, args.base256, args.hex]:
if a: g += 1
if g > 1: print("The -b, -B, --hex and --base256 args are mutually exclusive"); exit(1)
@@ -119,6 +122,7 @@ def main():
parser.add_argument("-e", "--encrypt", metavar="file", action="store", nargs="*", default=None, help="encrypt file")
parser.add_argument("-V", "--validate", metavar="path", action="store", nargs="*", default=None, help="validate signature")
parser.add_argument("-s", "--sign", metavar="path", action="store", nargs="*", default=None, help="sign file")
parser.add_argument("-S", "--sign-message", metavar="path", action="store", nargs="?", const=NO_MESSAGE, default=None, help="create embedded signed message")
parser.add_argument("--raw", action="store_true", default=False, help="sign raw input data instead of hashing first")
# I/O Control
@@ -157,6 +161,7 @@ def main():
if args.announce: announce(args, identity); op = True
if args.validate: validate(args, identity or args.identity); op = True
if args.sign: sign(args, identity); op = True
if args.sign_message: sign_message(args, identity); op = True
if args.encrypt: encrypt(args, identity); op = True
if args.decrypt: decrypt(args, identity); op = True
if args.write: write_identity(args, identity); op = True
@@ -398,6 +403,14 @@ def get_rsg_data(rsg):
return rsg_data
def extract_signed_rsg_data(rsg):
siglen = RNS.Identity.SIGLENGTH//8
rsg_data = get_rsg_data(rsg)
envelope = rsg_data[siglen:]
try: return mp.unpackb(envelope)
except: return None
def get_rsg_hash(message):
sha = None
if type(message) == bytes: sha = sha256(message)
@@ -466,7 +479,7 @@ def validate_rsg(rsg, message=None, required_signer=None):
return False, signed_data, signing_identity
def create_rsg(signer_identity, message, note=None, meta=None, output="bin"):
def create_rsg(signer_identity, message, embed=False, note=None, meta=None, output="bin"):
if not output in ["bin", "hex", "base32", "base256", "base64"]: raise TypeError(f"Invalid output format for rsg creation")
if not type(signer_identity) == RNS.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity")
if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key")
@@ -476,6 +489,10 @@ def create_rsg(signer_identity, message, note=None, meta=None, output="bin"):
"pubkey": signer_identity.get_public_key(),
"note" : note } }
if embed:
if type(message) == str: message = message.encode("utf-8")
signed_data["message"] = message
if meta and type(meta) == dict:
for key in meta:
if not key in signed_data["meta"]: signed_data["meta"]["key"] = meta["key"]
@@ -559,14 +576,17 @@ def validate(args, identity, __recursive=False):
if len(paths) != validated: print(f"Sequence error on recursive signature validation"); exit(R_SEQUENCE_ERROR)
else: exit(R_OK)
msg_ext = f".{MSG_EXT}"
sig_ext = f".{SIG_EXT}"
validate_path = os.path.expanduser(args.validate)
path_is_msgfile = validate_path.lower().endswith(msg_ext)
path_is_sigfile = validate_path.lower().endswith(sig_ext)
if path_is_sigfile: signature_path = validate_path; file_path = validate_path[:-len(sig_ext)]
else: signature_path = f"{validate_path}{sig_ext}"; file_path = validate_path
signature_exists = os.path.isfile(signature_path)
file_exists = os.path.isfile(file_path)
if path_is_msgfile: return validate_message(args, identity, __recursive=__recursive)
if not file_exists: print(f"The validation target \"{file_path}\" does not exist"); exit(R_NO_FILE)
if not signature_exists: print(f"No signature file exists for \"{file_path}\""); exit(R_NO_FILE)
@@ -610,6 +630,39 @@ def validate(args, identity, __recursive=False):
except Exception as e: print(f"Could not validate signature: {e}"); exit(R_READ_ERROR)
def validate_message(args, identity, __recursive=False):
msg_ext = f".{MSG_EXT}"
validate_path = os.path.expanduser(args.validate)
path_is_msgfile = validate_path.lower().endswith(msg_ext)
if path_is_msgfile: signature_path = validate_path
signature_exists = os.path.isfile(signature_path)
if not signature_exists: print(f"The signature file \"{signature_path}\" does not exist"); exit(R_NO_FILE)
try:
with open(signature_path, "rb") as fh: rsg = fh.read()
except Exception as e: print(f"Could not read rsg: {e}"); exit(R_READ_ERROR)
if type(identity) == str:
if not len(identity) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2: print("Invalid identity hash length"); exit(R_INVALID_IDENTITY)
try: identity = bytes.fromhex(identity)
except Exception as e: print(f"Invalid identity hash: {e}"); exit(R_INVALID_IDENTITY)
try:
rsm_contents = extract_signed_rsg_data(rsg)
if not "message" in rsm_contents: print(f"No embedded message in {signature_path}"); exit(R_INVALID_SIGNATURE)
valid, signed_data, signing_identity = validate_rsg(rsg, message=rsm_contents["message"], required_signer=identity)
identity_str = RNS.prettyhexrep(identity) if type(identity) == bytes else f"{identity}"
signer_description = f"\nThe message was NOT signed by {identity_str or signing_identity}" if identity else ""
if not valid: print(f"Invalid signature in {signature_path}{signer_description}"); exit(R_INVALID_SIGNATURE)
else:
print(f"\nSignature is valid, the following message was signed by {signing_identity}:\n")
print(signed_data["message"].decode("utf-8"))
return exit(R_OK) if not __recursive else R_OK
except Exception as e: print(f"Error while validating {signature_path}: {e}"); exit(R_UNKNOWN_ERROR)
def sign(args, identity, __recursive=False):
if type(args.sign) == list:
paths = args.sign.copy()
@@ -653,12 +706,47 @@ def sign(args, identity, __recursive=False):
with open(rsg_path, "wb") as out_file: out_file.write(rsg)
elif output in ["base32", "base64", "base256", "hex"]: print(f"\n{wrap_rsg(rsg)}\n")
else: print("No valid output format specified")
else: print("No valid output format specified"); exit(R_INVALID_ARGS)
print(f"Signed file {sign_path} with {identity}"); return exit(R_OK) if not __recursive else R_OK
except Exception as e: print(f"Could not sign {sign_path}: {e}"); exit(R_UNKNOWN_ERROR)
def sign_message(args, identity):
message = args.sign_message
if args.base32: output = "base32"
elif args.base64: output = "base64"
elif args.base256: output = "base256"
elif args.hex: output = "hex"
else: output = "bin"
if output == "bin" and not args.write: print("No write path specified"); exit(R_INVALID_ARGS)
if not identity.get_private_key(): print(f"Cannot sign \"{sign_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY)
if message == NO_MESSAGE: message = get_editor_content()
if not message: print("No message specified"); exit(R_INVALID_ARGS)
try:
rsg = create_rsg(identity, message, embed=True, output=output)
if not rsg: print(f"No signature created, not writing"); exit(R_UNKNOWN_ERROR)
if output == "bin":
sig_ext = f".{MSG_EXT}"
rsg_path = os.path.expanduser(args.write)
rsg_path = f"{rsg_path}{sig_ext}" if not rsg_path.endswith(sig_ext) else rsg_path
signature_exists = os.path.isfile(rsg_path)
if signature_exists and not args.force: print(f"The signature file \"{rsg_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS)
with open(rsg_path, "wb") as out_file: out_file.write(rsg)
print(f"Message signed with {identity} saved to {rsg_path}"); exit(R_OK)
elif output in ["base32", "base64", "base256", "hex"]: print(f"\n{wrap_rsg(rsg)}\n")
else: print("No valid output format specified"); exit(R_INVALID_ARGS)
print(f"Message signed with {identity}"); exit(R_OK)
except Exception as e: print(f"Could not sign message: {e}"); exit(R_UNKNOWN_ERROR)
######################################
# Encryption & Decryption Operations #
@@ -851,6 +939,33 @@ def export_prv_identity(args, identity):
# Helper & Utility Functions #
##############################
def get_editor_content():
import subprocess
from tempfile import NamedTemporaryFile
template = ""
editor = os.environ.get("EDITOR", "")
if not editor:
for fallback in ["nano", "vim", "vi"]:
try:
subprocess.run(["which", fallback], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
editor = fallback
break
except subprocess.CalledProcessError: continue
if not editor: print("Could not launch editor"); exit(R_READ_ERROR);
try:
with NamedTemporaryFile(mode="w+", suffix=".tmp", delete=False) as tmp:
tmp_path = tmp.name
tmp.write(template)
result = subprocess.run([editor, tmp_path])
if result.returncode != 0: print(f"Editor exited with error code {result.returncode}"); os.unlink(tmp_path); exit(R_READ_ERROR)
with open(tmp_path, "r") as f: content = f.read()
os.unlink(tmp_path)
return content.encode("utf-8")
except Exception as e: print(f"Could not get content from editor: {e}"); exit(R_READ_ERROR)
def spin(until=None, msg=None, timeout=None):
i = 0
syms = "⢄⢂⢁⡁⡈⡐⡠"