Files
Reticulum/RNS/Utilities/rnid.py
T
2026-05-07 14:11:49 +02:00

744 lines
33 KiB
Python

#!/usr/bin/env python3
# Reticulum License
#
# Copyright (c) 2016-2025 Mark Qvist
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# - The Software shall not be used in any kind of system which includes amongst
# its functions the ability to purposefully do harm to human beings.
#
# - The Software shall not be used, directly or indirectly, in the creation of
# an artificial intelligence, machine learning or language model training
# dataset, including but not limited to any use that contributes to the
# training or development of such a model or algorithm.
#
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import RNS
import argparse
import time
import sys
import os
import io
import base64
from RNS._version import __version__
from RNS.vendor.umsgpack import umsgpack as mp
from RNS.Cryptography.Hashes import sha256
from RNS.Cryptography.Hashes import file_sha256
APP_NAME = "rns"
DEFAULT_ASPECTS = f"{APP_NAME}.id"
PRV_EXT = "rid"
PUB_EXT = "pub"
SIG_EXT = "rsg"
ENCRYPT_EXT = "rfe"
CHUNK_SIZE = 16*1024*1024
RSG_HASHTYPES = ["sha256"]
R_OK = 0
R_FILE_EXISTS = 1
R_NO_FILE = 1
R_NO_SIG_FILE = 1
R_NO_PUBKEY = 2
R_NO_PRVKEY = 3
R_NO_KEYS = 4
R_WRITE_ERROR = 5
R_INVALID_ASPECTS = 4
R_INVALID_SIGNATURE = 128
R_READ_ERROR = 252
R_WRITE_ERROR = 253
R_UNKNOWN_ERROR = 254
R_INTERRUPTED = 255
reticulum = None
def validate_args(args):
ops = 0;
for o in [args.encrypt, args.decrypt, args.validate, args.sign]:
if o: ops += 1
if ops > 1: print("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation"); exit(1)
g = 0;
for a in [args.import_pub, args.import_prv, args.identity, args.generate]:
if a: g += 1
if g > 1: print("The -i, -g, -m and -M args are mutually exclusive"); exit(1)
g = 0;
for a in [args.base64, args.base32]:
if a: g += 1
if g > 1: print("The -b and -B args are mutually exclusive"); exit(1)
return True
def main():
try:
parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility")
# Identity Resolution
parser.add_argument("--config", metavar="path", action="store", default=None, help="path to alternative Reticulum config directory", type=str)
parser.add_argument("-i", "--identity", metavar="rid", action="store", default=None, help="hexadecimal Reticulum identity or destination hash, or path to Identity file", type=str)
parser.add_argument("-g", "--generate", metavar="path", action="store", default=None, help="generate a new Identity and save to path")
parser.add_argument("-m", "--import-pub", dest="import_pub", metavar="rid", action="store", default=None, help="import public Reticulum identity in hex, base32 or base64 format, or from file", type=str)
parser.add_argument("-M", "--import-prv", dest="import_prv", metavar="rid", action="store", default=None, help="import Reticulum identity in hex, base32 or base64 format, or from file", type=str)
parser.add_argument("-x", "--export-pub", action="store_true", default=None, help="export public identity to hex, base32 or base64 format")
parser.add_argument("-X", "--export-prv", action="store_true", default=None, help="export private identity to hex, base32 or base64 format, or to file")
# Verbosity Control
parser.add_argument("-v", "--verbose", action="count", default=0, help="increase verbosity")
parser.add_argument("-q", "--quiet", action="count", default=0, help="decrease verbosity")
# Operations
parser.add_argument("-a", "--announce", metavar="aspects", action="store", nargs="?", const=DEFAULT_ASPECTS, default=None, help="announce a destination based on this Identity")
parser.add_argument("-H", "--hash", metavar="aspects", action="store", default=None, help="show destination hashes for other aspects for this Identity")
parser.add_argument("-d", "--decrypt", metavar="file", action="store", default=None, help="decrypt file")
parser.add_argument("-e", "--encrypt", metavar="file", action="store", default=None, help="encrypt file")
parser.add_argument("-V", "--validate", metavar="path", action="store", default=None, help="validate signature")
parser.add_argument("-s", "--sign", metavar="path", action="store", default=None, help="sign file")
parser.add_argument("--raw", action="store_true", default=False, help="sign raw input data instead of hashing first")
# I/O Control
parser.add_argument("-r", "--read", metavar="file", action="store", default=None, help="input file path", type=str)
parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str)
parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files")
parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file"
parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file",
# Information Flow
parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network")
parser.add_argument("-t", action="store", metavar="seconds", type=float, help="identity request timeout before giving up", default=RNS.Transport.PATH_REQUEST_TIMEOUT)
parser.add_argument("-p", "--print-identity", action="store_true", default=False, help="print identity info and exit")
parser.add_argument("-P", "--print-private", action="store_true", default=False, help="allow displaying private keys")
# Formatting Control
parser.add_argument("-b", "--base64", action="store_true", default=False, help="Use base64-encoded input and output")
parser.add_argument("-B", "--base32", action="store_true", default=False, help="Use base32-encoded input and output")
parser.add_argument("--version", action="version", version="rnid {version}".format(version=__version__))
args = parser.parse_args()
validate_args(args)
if not args.read:
if args.encrypt: args.read = args.encrypt
if args.decrypt: args.read = args.decrypt
if args.sign: args.read = args.sign
if args.validate: args.read = args.validate
identity = get_operating_identity(args)
if not identity: print("Could not get working identity"); exit(9)
if args.print_identity: print_identity_information(args, identity)
if args.export_pub: export_pub_identity(args, identity)
if args.export_prv: export_prv_identity(args, identity)
if args.hash: print_hash_information(args, identity)
if args.write: write_identity(args, identity)
if args.announce: announce(args, identity)
if args.validate: validate(args, identity)
if args.sign: sign(args, identity)
if args.encrypt: encrypt(args, identity)
if args.decrypt: decrypt(args, identity)
exit(0)
except KeyboardInterrupt: print(""); exit(R_INTERRUPTED)
#####################
# Reticulum Helpers #
#####################
def ensure_reticulum(args):
global reticulum
if not reticulum:
targetloglevel = 4; verbosity = args.verbose; quietness = args.quiet
if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness
reticulum = RNS.Reticulum(configdir=args.config, loglevel=targetloglevel)
RNS.compact_log_fmt = True
if args.stdout: RNS.loglevel = -1
#################################
# Identity Loading & Resolution #
#################################
def get_operating_identity(args):
global reticulum
identity = None
if args.generate:
identity = RNS.Identity()
if not args.force and os.path.isfile(args.generate):
print("Identity file "+str(args.generate)+" already exists. Not overwriting.")
exit(R_FILE_EXISTS)
else:
try: identity.to_file(args.generate); print(f"New identity {identity} written to {args.generate}")
except Exception as e: print(f"An error ocurred while saving the generated Identity: {e}"); exit(R_WRITE_ERROR)
elif args.identity:
load_path = None
try: load_path = os.path.expanduser(args.identity)
except: pass
if load_path and os.path.isfile(load_path):
# Attempt to load Identity from .rid file
try:
identity = RNS.Identity.from_file(load_path)
print(f"Loaded Identity {identity} from {load_path}")
if not identity.get_private_key() or not identity.get_public_key():
raise SystemError("Missing key data in loaded identity")
except Exception as e:
print(f"Could not load Identity from specified file: {e}")
exit(9)
elif len(identity_str) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2:
# Attempt to recall Identity from hex-encoded hash
try:
ident_hash = bytes.fromhex(identity_str)
identity = RNS.Identity.recall(ident_hash) or RNS.Identity.recall(ident_hash, from_identity_hash=True)
if identity == None:
if not args.request:
print("Could not recall Identity for "+RNS.prettyhexrep(ident_hash)+".")
print("You can query the network for unknown Identities with the -R option.")
exit(5)
else:
ensure_reticulum(args)
RNS.Transport.request_path(ident_hash)
def spincheck(): return RNS.Identity.recall(ident_hash) != None
spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(ident_hash), args.t)
if not spincheck(): print("Identity request timed out"); exit(6)
else:
identity = RNS.Identity.recall(ident_hash)
print("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(ident_hash)+" from the network")
else:
ident_str = str(identity)
hash_str = RNS.prettyhexrep(ident_hash)
if ident_str == hash_str: print(f"Recalled Identity {ident_str}")
else: print(f"Recalled Identity {ident_str} for destination {hash_str}")
except Exception as e: print("Invalid hexadecimal hash provided"); exit(7)
elif args.import_pub or args.import_prv:
prvsize = RNS.Identity.KEYSIZE//8
pubsize = prvsize
identity_bytes = None
if args.import_pub:
try:
identity_bytes = None
import_path = os.path.expanduser(args.import_pub)
if os.path.isfile(import_path):
try:
with open(import_path, "rb") as fh: file_input = fh.read()
if file_input and len(file_input) == pubsize:
identity_bytes = file_input
print(f"Reticulum Identity imported from {import_path}")
except: pass
if not identity_bytes:
if len(args.import_pub) == pubsize*2:
try:
identity_bytes = bytes.fromhex(args.import_pub)
print("Reticulum Identity imported from hex input")
except: pass
if not identity_bytes:
try:
b32_decoded = base64.b32decode(args.import_pub)
if len(b32_decoded) == pubsize:
identity_bytes = b32_decoded
print("Reticulum Identity imported from base32 input")
except: pass
if not identity_bytes:
try:
b64_decoded = base64.urlsafe_b64decode(args.import_pub)
if len(b64_decoded) == pubsize:
identity_bytes = b64_decoded
print("Reticulum Identity imported from base64 input")
except: pass
if not identity_bytes:
print("Could not decode specified data to a valid public Reticulum Identity")
exit(41)
except Exception as e:
print("Invalid identity data specified for private identity import: "+str(e))
exit(41)
elif args.import_prv:
try:
identity_bytes = None
import_path = os.path.expanduser(args.import_prv)
if os.path.isfile(import_path):
try:
with open(import_path, "rb") as fh: file_input = fh.read()
if file_input and len(file_input) == prvsize:
identity_bytes = file_input
print(f"Reticulum Identity imported from {import_path}")
except: pass
if not identity_bytes:
if len(args.import_prv) == prvsize*2:
try:
identity_bytes = bytes.fromhex(args.import_prv)
print("Reticulum Identity imported from hex input")
except: pass
if not identity_bytes:
try:
b32_decoded = base64.b32decode(args.import_prv)
if len(b32_decoded) == prvsize:
identity_bytes = b32_decoded
print("Reticulum Identity imported from base32 input")
except: pass
if not identity_bytes:
try:
b64_decoded = base64.urlsafe_b64decode(args.import_prv)
if len(b64_decoded) == prvsize:
identity_bytes = b64_decoded
print("Reticulum Identity imported from base64 input")
except: pass
if not identity_bytes:
print("Could not decode specified data to a valid private Reticulum Identity")
exit(41)
except Exception as e:
print("Invalid identity data specified for private identity import: "+str(e))
exit(41)
if args.import_prv:
try: identity = RNS.Identity.from_bytes(identity_bytes)
except Exception as e:
print("Could not create Reticulum identity from specified data: "+str(e))
exit(42)
elif args.import_pub:
try:
identity = RNS.Identity(create_keys=False)
identity.load_public_key(identity_bytes)
except Exception as e:
print("Could not create Reticulum identity from specified data: "+str(e))
exit(42)
return identity
######################
# Network Operations #
######################
def announce(args, identity):
try:
ensure_reticulum(args)
aspects = args.announce.split(".")
if not len(aspects) > 1: print("Invalid destination aspects specified"); exit(R_INVALID_ASPECTS)
else:
app_name = aspects[0]; aspects = aspects[1:]
if not identity.get_private_key(): print("Cannot announce this destination, since the private key is not held"); exit(R_NO_PRVKEY)
else:
destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects)
print(f"Announcing {args.announce} destination {RNS.prettyhexrep(destination.hash)} for identity {identity}")
destination.announce(); time.sleep(0.25)
except Exception as e: print(f"An error ocurred while attempting to send the announce: {e}"); exit(R_UNKNOWN_ERROR)
############################
# Cryptographic Operations #
############################
def read_rsg(rsg):
rsg_data = None
if type(rsg) == bytes: rsg_data = rsg
elif type(rsg) == io.BufferReader: rsg_data = rsg.read()
elif type(rsg) == str:
try: rsg_data = base64.urlsafe_b64decode(rsg)
except: pass
try: rsg_data = base64.b32decode(rsg)
except: pass
if not rsg_data: return False
else:
siglen = Identity.SIGLENGTH//8
if len(rsg_data) == siglen: return rsg_data, None
if len(rsg_data) < siglen+1: return False
else:
signature = rsg_data[:siglen]
envelope = rsg_data[siglen:]
try: signed_data = mp.unpackb(envelope)
except: return False
if not "hashtype" in signed_data or not "hash" in signed_data: return False, None
if not signed_data["hash"] in RSG_HASHTYPES: return False, None
if not "meta" in signed_data: return False, None
if not "signer" in signed_data["meta"]: return False, None
if not "pubkey" in signed_data["meta"]: return False, None
if not "note" in signed_data["meta"]: return False, None
return signature, signed_data
def create_rsg(signer_identity, signature_input, note=None, meta=None, output="bin"):
if not output in ["bin", "hex", "base32", "base64"]: raise TypeError(f"Invalid output format for rsg creation")
if not type(signer_identity) == RNS.Identity.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity")
if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key")
if type(signature_input) == bytes: sha = sha256(signature_input)
elif type(signature_input) == str: sha = sha256(signature_input.encode("utf-8"))
elif type(signature_input) == io.BufferReader: sha = file_sha256(signature_input)
else: raise TypeError(f"Invalid input type {type(signature_input)} for rsg creation")
signed_data = { "hashtype": "sha256", "hash": sha,
"meta": { "signer": identity.hash,
"pubkey": identity.get_public_key(),
"note" : note } }
if meta and type(meta) == dict:
for key in meta:
if not key in signed_data["meta"]: signed_data["meta"]["key"] = meta["key"]
envelope = mp.packb(signed_data)
signature = signer_identity.sign(envelope)
return signature+envelope
def validate(args, identity):
sig_ext = f".{SIG_EXT}"
validate_path = os.path.expanduser(args.validate)
path_is_sigfile = validate_path.lower().endswith(sig_ext)
if path_is_sigfile: signature_path = validate_path; file_path = validate_path[:-len(sig_ext)]
else: signature_path = f"{validate_path}{sig_ext}"; file_path = validate_path
signature_exists = os.path.isfile(signature_path)
file_exists = os.path.isfile(file_path)
if not file_exists: print(f"The validation target \"{file_path}\" does not exist"); exit(R_NO_FILE)
if not signature_exists: print(f"No signature file exists for \"{file_path}\""); exit(R_NO_FILE)
try:
with open(signature_path, "rb") as fh: signature = fh.read()
except Exception as e: print(f"Could not read signature: {e}"); exit(R_READ_ERROR)
try:
with open(file_path, "rb") as fh: valid = identity.validate(signature, fh.read())
if not valid: print(f"Invalid signature {signature_path} for file {file_path}"); exit(R_INVALID_SIGNATURE)
else: print(f"Signature is valid, the file {file_path} was signed by {identity}."); exit(R_OK)
except Exception as e: print(f"Could not validate signature: {e}"); exit(R_READ_ERROR)
def sign(args, identity):
sig_ext = f".{SIG_EXT}"
sign_path = os.path.expanduser(args.sign)
signature_path = f"{sign_path}{sig_ext}"
file_exists = os.path.isfile(sign_path)
signature_exists = os.path.isfile(signature_path)
if not identity.get_private_key(): print(f"Cannot sign \"{sign_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY)
if not file_exists: print(f"The file \"{sign_path}\" does not exist"); exit(R_NO_FILE)
if signature_exists and not args.force:
print(f"The signature file \"{signature_path}\" already exists, not overwriting"); exit(R_FILE_EXISTS)
try:
if args.raw:
with open(sign_path, "rb") as fh: data = fh.read()
with open(signature_path, "wb") as fh: fh.write(identity.sign(data))
else:
with open(sign_path, "rb") as in_file:
with open(signature_path, "rb") as out_file:
out_file.write(create_rsg(identity, in_file))
print(f"Signed file {sign_path} with {identity}"); exit(R_OK)
except Exception as e: print(f"Could not sign {sign_path}: {e}"); exit(R_UNKNOWN_ERROR)
def encrypt(args, identity):
pass
def decrypt(args, identity):
pass
################
# File Output #
################
def write_identity(args, identity):
try:
wp = os.path.expanduser(args.write)
args.write = False
if identity.get_private_key() and args.export_prv:
if not os.path.isfile(wp) or args.force:
identity.to_file(wp)
print("Wrote private identity to "+str(wp))
else: print("File "+str(wp)+" already exists, not overwriting"); exit(R_FILE_EXISTS)
elif identity.get_public_key():
if not wp.lower().endswith(f".{PUB_EXT}"): wp += f".{PUB_EXT}"
if not os.path.isfile(wp) or args.force:
identity.pub_to_file(wp)
print("Wrote public identity to "+str(wp))
else: print("File "+str(wp)+" already exists, not overwriting"); exit(R_FILE_EXISTS)
else: print("Identity holds neither a public nor private key"); exit(R_NO_KEYS)
except Exception as e: print("Error while writing imported identity to file: "+str(e)); exit(R_WRITE_ERROR)
###################
# Terminal Output #
###################
def print_identity_information(args, identity):
print("Identity Hash : "+RNS.prettyhexrep(identity.hash))
if args.base64: print("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8"))
elif args.base32: print("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8"))
else: print("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False))
if identity.prv:
if args.print_private:
if args.base64: print("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8"))
elif args.base32: print("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8"))
else: print("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False))
else: print("Private Key : Hidden")
def print_hash_information(args, identity):
try:
aspects = args.hash.split(".")
if not len(aspects) > 0: print("Invalid destination aspects specified"); exit(R_INVALID_ASPECTS)
else:
app_name = aspects[0]; aspects = aspects[1:]
if not identity.get_public_key(): print("Identity does not hold a public key"); exit(R_NO_PUBKEY)
else:
destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects)
print("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash))
print("The full destination specifier is "+str(destination))
except Exception as e: print(f"An error ocurred while attempting to get hash information: {e}"); exit(R_UNKNOWN_ERROR)
def export_pub_identity(args, identity):
k = identity.get_public_key()
if not k: print("Identity doesn't hold a public key, cannot export"); exit(R_NO_PUBKEY)
else:
if args.base64: print("Public Identity Keys : "+base64.urlsafe_b64encode(k).decode("utf-8"))
elif args.base32: print("Public Identity Keys : "+base64.b32encode(k).decode("utf-8"))
else: print("Public Identity Keys : "+RNS.hexrep(k, delimit=False))
def export_prv_identity(args, identity):
k = identity.get_private_key()
if not k: print("Identity doesn't hold a private key, cannot export"); exit(R_NO_PRVKEY)
else:
if args.base64: print("Private Identity Keys : "+base64.urlsafe_b64encode(k).decode("utf-8"))
elif args.base32: print("Private Identity Keys : "+base64.b32encode(k).decode("utf-8"))
else: print("Private Identity Keys : "+RNS.hexrep(k, delimit=False))
##############################
# Helper & Utility Functions #
##############################
def spin(until=None, msg=None, timeout=None):
i = 0
syms = "⢄⢂⢁⡁⡈⡐⡠"
if timeout != None: timeout = time.time()+timeout
print(msg+" ", end=" ")
while (timeout == None or time.time()<timeout) and not until():
time.sleep(0.1)
print(("\b\b"+syms[i]+" "), end="")
sys.stdout.flush()
i = (i+1)%len(syms)
print("\r"+" "*len(msg)+" \r", end="")
if timeout != None and time.time() > timeout: return False
else: return True
if __name__ == "__main__":
main()
############ Legacy reference
# if identity != None:
# data_output = None
# if args.encrypt and not args.write and not args.stdout and args.read:
# args.write = str(args.read)+"."+ENCRYPT_EXT
# if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT):
# args.write = str(args.read).replace("."+ENCRYPT_EXT, "")
# if args.write:
# if not args.force and os.path.isfile(args.write):
# print("Output file "+str(args.write)+" already exists. Not overwriting.")
# exit(15)
# else:
# try:
# data_output = open(args.write, "wb")
# except Exception as e:
# print("Could not open output file for writing")
# print("The contained exception was: "+str(e))
# exit(15)
# # TODO: Actually expand this to a good solution
# # probably need to create a wrapper that takes
# # into account not closing stdout when done
# # elif args.stdout:
# # data_output = sys.stdout
# if args.sign:
# if identity.prv == None:
# print("Specified Identity does not hold a private key. Cannot sign.")
# exit(16)
# if not data_input:
# if not args.stdout:
# print("Signing requested, but no input data specified")
# exit(17)
# else:
# if not data_output:
# if not args.stdout:
# print("Signing requested, but no output specified")
# exit(18)
# if not args.stdout:
# print("Signing "+str(args.read))
# try:
# data_output.write(identity.sign(data_input.read()))
# data_output.close()
# data_input.close()
# if not args.stdout:
# if args.read:
# print("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write))
# exit(0)
# except Exception as e:
# if not args.stdout:
# print("An error ocurred while encrypting data.")
# print("The contained exception was: "+str(e))
# try:
# data_output.close()
# except:
# pass
# try:
# data_input.close()
# except:
# pass
# exit(19)
# if args.encrypt:
# if not data_input:
# if not args.stdout:
# print("Encryption requested, but no input data specified")
# exit(24)
# else:
# if not data_output:
# if not args.stdout:
# print("Encryption requested, but no output specified")
# exit(25)
# if not args.stdout:
# print("Encrypting "+str(args.read))
# try:
# more_data = True
# while more_data:
# chunk = data_input.read(CHUNK_SIZE)
# if chunk:
# data_output.write(identity.encrypt(chunk))
# else:
# more_data = False
# data_output.close()
# data_input.close()
# if not args.stdout:
# if args.read:
# print("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write))
# exit(0)
# except Exception as e:
# if not args.stdout:
# print("An error ocurred while encrypting data.")
# print("The contained exception was: "+str(e))
# try:
# data_output.close()
# except:
# pass
# try:
# data_input.close()
# except:
# pass
# exit(26)
# if args.decrypt:
# if identity.prv == None:
# print("Specified Identity does not hold a private key. Cannot decrypt.")
# exit(27)
# if not data_input:
# if not args.stdout:
# print("Decryption requested, but no input data specified")
# exit(28)
# else:
# if not data_output:
# if not args.stdout:
# print("Decryption requested, but no output specified")
# exit(29)
# if not args.stdout:
# print("Decrypting "+str(args.read)+"...")
# try:
# more_data = True
# while more_data:
# chunk = data_input.read(CHUNK_SIZE)
# if chunk:
# plaintext = identity.decrypt(chunk)
# if plaintext == None:
# if not args.stdout:
# print("Data could not be decrypted with the specified Identity")
# exit(30)
# else:
# data_output.write(plaintext)
# else:
# more_data = False
# data_output.close()
# data_input.close()
# if not args.stdout:
# if args.read:
# print("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write))
# exit(0)
# except Exception as e:
# if not args.stdout:
# print("An error ocurred while decrypting data.")
# print("The contained exception was: "+str(e))
# try: data_output.close()
# except: pass
# try: data_input.close()
# except: pass
# exit(31)