mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-06-11 15:23:31 -07:00
Refactoring work for rnid
This commit is contained in:
+106
-46
@@ -39,7 +39,7 @@ import io
|
||||
import base64
|
||||
|
||||
from RNS._version import __version__
|
||||
from RNS.vendor.umsgpack import umsgpack as mp
|
||||
from RNS.vendor import umsgpack as mp
|
||||
from RNS.Cryptography.Hashes import sha256
|
||||
from RNS.Cryptography.Hashes import file_sha256
|
||||
|
||||
@@ -58,6 +58,7 @@ R_OK = 0
|
||||
R_FILE_EXISTS = 1
|
||||
R_NO_FILE = 1
|
||||
R_NO_SIG_FILE = 1
|
||||
R_NO_IDENTITY = 1
|
||||
R_NO_PUBKEY = 2
|
||||
R_NO_PRVKEY = 3
|
||||
R_NO_KEYS = 4
|
||||
@@ -373,50 +374,87 @@ def announce(args, identity):
|
||||
# Cryptographic Operations #
|
||||
############################
|
||||
|
||||
def read_rsg(rsg):
|
||||
def get_rsg_data(rsg):
|
||||
rsg_data = None
|
||||
if type(rsg) == bytes: rsg_data = rsg
|
||||
elif type(rsg) == io.BufferReader: rsg_data = rsg.read()
|
||||
if type(rsg) == bytes: rsg_data = rsg
|
||||
elif type(rsg) == io.BufferedReader: rsg_data = rsg.read()
|
||||
elif type(rsg) == str:
|
||||
try: rsg_data = base64.urlsafe_b64decode(rsg)
|
||||
except: pass
|
||||
try: rsg_data = base64.b32decode(rsg)
|
||||
except: pass
|
||||
try: rsg_data = base64.urlsafe_b64decode(rsg)
|
||||
except: pass
|
||||
try: rsg_data = base64.b32decode(rsg)
|
||||
except: pass
|
||||
|
||||
return rsg_data
|
||||
|
||||
def get_rsg_hash(message):
|
||||
sha = None
|
||||
if type(message) == bytes: sha = sha256(message)
|
||||
elif type(message) == str: sha = sha256(message.encode("utf-8"))
|
||||
elif type(message) == io.BufferedReader: sha = file_sha256(message)
|
||||
else: raise TypeError(f"Invalid input type {type(message)} for rsg creation")
|
||||
|
||||
if not sha: raise ValueError("Hash calculation for rsg signature input failed")
|
||||
return sha
|
||||
|
||||
def rsg_is_legacy_format(rsg):
|
||||
rsg_data = get_rsg_data(rsg)
|
||||
if not rsg_data: return False
|
||||
return True if len(rsg_data) == RNS.Identity.SIGLENGTH//8 else False
|
||||
|
||||
def validate_rsg(rsg, message=None, required_signer=None):
|
||||
if not message: raise ValueError(f"No message specified for rsg validation")
|
||||
if not type(required_signer) in [RNS.Identity, type(None)]: raise TypeError(f"Invalid required signer type {type(required_signer)}")
|
||||
|
||||
siglen = RNS.Identity.SIGLENGTH//8
|
||||
rsg_data = get_rsg_data(rsg)
|
||||
rsg_hash = get_rsg_hash(message)
|
||||
|
||||
if len(rsg_data) == siglen: raise ValueError(f"Cannot validate legacy rsg format")
|
||||
|
||||
if not rsg_data: return False, None, None
|
||||
else:
|
||||
siglen = Identity.SIGLENGTH//8
|
||||
if len(rsg_data) == siglen: return rsg_data, None
|
||||
if len(rsg_data) < siglen+1: return False
|
||||
if len(rsg_data) < siglen+1: return False, None, None
|
||||
else:
|
||||
signature = rsg_data[:siglen]
|
||||
envelope = rsg_data[siglen:]
|
||||
signing_identity = None
|
||||
signature = rsg_data[:siglen]
|
||||
envelope = rsg_data[siglen:]
|
||||
|
||||
try: signed_data = mp.unpackb(envelope)
|
||||
except: return False
|
||||
except: return False, None, None
|
||||
|
||||
if not "hashtype" in signed_data or not "hash" in signed_data: return False, None
|
||||
if not signed_data["hash"] in RSG_HASHTYPES: return False, None
|
||||
if not "meta" in signed_data: return False, None
|
||||
if not "signer" in signed_data["meta"]: return False, None
|
||||
if not "pubkey" in signed_data["meta"]: return False, None
|
||||
if not "note" in signed_data["meta"]: return False, None
|
||||
if not "hashtype" in signed_data or not "hash" in signed_data: return False, None, None
|
||||
if not signed_data["hash"] in RSG_HASHTYPES: return False, None, None
|
||||
if not "meta" in signed_data: return False, None, None
|
||||
if not "signer" in signed_data["meta"]: return False, None, None
|
||||
if not "pubkey" in signed_data["meta"]: return False, None, None
|
||||
if not "note" in signed_data["meta"]: return False, None, None
|
||||
if signed_data["hash"] != rsg_hash: return False, None, None
|
||||
|
||||
return signature, signed_data
|
||||
try:
|
||||
if required_signer:
|
||||
signing_identity = required_signer
|
||||
|
||||
else:
|
||||
signing_identity = RNS.Identity(create_keys=false)
|
||||
signing_identity.load_public_key(signed_data["meta"]["pubkey"])
|
||||
|
||||
except: return False, None, None
|
||||
|
||||
if not signing_identity: return False, None, None
|
||||
else:
|
||||
if not signing_identity.validate(signature, envelope): return False, signed_data, signing_identity
|
||||
else: return True, signed_data, signing_identity
|
||||
|
||||
def create_rsg(signer_identity, signature_input, note=None, meta=None, output="bin"):
|
||||
return False, signed_data, signing_identity
|
||||
|
||||
def create_rsg(signer_identity, message, note=None, meta=None, output="bin"):
|
||||
if not output in ["bin", "hex", "base32", "base64"]: raise TypeError(f"Invalid output format for rsg creation")
|
||||
if not type(signer_identity) == RNS.Identity.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity")
|
||||
if not type(signer_identity) == RNS.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity")
|
||||
if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key")
|
||||
|
||||
if type(signature_input) == bytes: sha = sha256(signature_input)
|
||||
elif type(signature_input) == str: sha = sha256(signature_input.encode("utf-8"))
|
||||
elif type(signature_input) == io.BufferReader: sha = file_sha256(signature_input)
|
||||
else: raise TypeError(f"Invalid input type {type(signature_input)} for rsg creation")
|
||||
|
||||
signed_data = { "hashtype": "sha256", "hash": sha,
|
||||
"meta": { "signer": identity.hash,
|
||||
"pubkey": identity.get_public_key(),
|
||||
signed_data = { "hashtype": "sha256", "hash": get_rsg_hash(message),
|
||||
"meta": { "signer": signer_identity.hash,
|
||||
"pubkey": signer_identity.get_public_key(),
|
||||
"note" : note } }
|
||||
|
||||
if meta and type(meta) == dict:
|
||||
@@ -441,37 +479,60 @@ def validate(args, identity):
|
||||
if not signature_exists: print(f"No signature file exists for \"{file_path}\""); exit(R_NO_FILE)
|
||||
|
||||
try:
|
||||
with open(signature_path, "rb") as fh: signature = fh.read()
|
||||
except Exception as e: print(f"Could not read signature: {e}"); exit(R_READ_ERROR)
|
||||
with open(signature_path, "rb") as fh: is_legacy_format = rsg_is_legacy_format(fh)
|
||||
except Exception as e: print(f"Could not detect rsg format: {e}"); exit(R_UNKNOWN_ERROR)
|
||||
|
||||
try:
|
||||
with open(file_path, "rb") as fh: valid = identity.validate(signature, fh.read())
|
||||
if not valid: print(f"Invalid signature {signature_path} for file {file_path}"); exit(R_INVALID_SIGNATURE)
|
||||
else: print(f"Signature is valid, the file {file_path} was signed by {identity}."); exit(R_OK)
|
||||
|
||||
except Exception as e: print(f"Could not validate signature: {e}"); exit(R_READ_ERROR)
|
||||
if not is_legacy_format:
|
||||
try:
|
||||
with open(signature_path, "rb") as fh: rsg = fh.read()
|
||||
except Exception as e: print(f"Could not read rsg: {e}"); exit(R_READ_ERROR)
|
||||
|
||||
try:
|
||||
with open(file_path, "rb") as fh:
|
||||
try:
|
||||
valid, signed_data, signing_identity = validate_rsg(rsg, message=fh, required_signer=identity)
|
||||
if not valid: print(f"Invalid signature {signature_path} for file {file_path}"); exit(R_INVALID_SIGNATURE)
|
||||
else: print(f"Signature is valid, the file {file_path} was signed by {signing_identity}."); exit(R_OK)
|
||||
|
||||
except Exception as e: print(f"Error while validating {signature_path}: {e}"); exit(R_UNKNOWN_ERROR)
|
||||
|
||||
except Exception as e: print(f"Could not read {file_path}: {e}"); exit(R_READ_ERROR)
|
||||
|
||||
else:
|
||||
if identity == None: print(f"Cannot validate legacy rsg signatures without an explicit required identity"); exit(R_NO_IDENTITY)
|
||||
try:
|
||||
with open(signature_path, "rb") as fh: signature = fh.read()
|
||||
except Exception as e: print(f"Could not read signature: {e}"); exit(R_READ_ERROR)
|
||||
|
||||
try:
|
||||
with open(file_path, "rb") as fh: valid = identity.validate(signature, fh.read())
|
||||
if not valid: print(f"Invalid signature {signature_path} for file {file_path}"); exit(R_INVALID_SIGNATURE)
|
||||
else: print(f"Signature is valid, the file {file_path} was signed by {identity}."); exit(R_OK)
|
||||
|
||||
except Exception as e: print(f"Could not validate signature: {e}"); exit(R_READ_ERROR)
|
||||
|
||||
def sign(args, identity):
|
||||
sig_ext = f".{SIG_EXT}"
|
||||
sign_path = os.path.expanduser(args.sign)
|
||||
signature_path = f"{sign_path}{sig_ext}"
|
||||
rsg_path = f"{sign_path}{sig_ext}"
|
||||
file_exists = os.path.isfile(sign_path)
|
||||
signature_exists = os.path.isfile(signature_path)
|
||||
signature_exists = os.path.isfile(rsg_path)
|
||||
|
||||
if not identity.get_private_key(): print(f"Cannot sign \"{sign_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY)
|
||||
if not file_exists: print(f"The file \"{sign_path}\" does not exist"); exit(R_NO_FILE)
|
||||
if signature_exists and not args.force:
|
||||
print(f"The signature file \"{signature_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS)
|
||||
print(f"The signature file \"{rsg_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS)
|
||||
|
||||
try:
|
||||
if args.raw:
|
||||
with open(sign_path, "rb") as fh: data = fh.read()
|
||||
with open(signature_path, "wb") as fh: fh.write(identity.sign(data))
|
||||
with open(rsg_path, "wb") as fh: fh.write(identity.sign(data))
|
||||
|
||||
else:
|
||||
with open(sign_path, "rb") as in_file:
|
||||
with open(signature_path, "rb") as out_file:
|
||||
out_file.write(create_rsg(identity, in_file))
|
||||
rsg = create_rsg(identity, in_file)
|
||||
if not rsg: print(f"No signature created, not writing"); exit(R_UNKNOWN_ERROR)
|
||||
with open(rsg_path, "wb") as out_file: out_file.write(rsg)
|
||||
|
||||
print(f"Signed file {sign_path} with {identity}"); exit(R_OK)
|
||||
|
||||
@@ -581,7 +642,6 @@ if __name__ == "__main__":
|
||||
|
||||
|
||||
############ Legacy reference
|
||||
|
||||
# if identity != None:
|
||||
|
||||
# data_output = None
|
||||
|
||||
Reference in New Issue
Block a user