mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-06-08 14:11:53 -07:00
Base256 encoding
This commit is contained in:
+35
-14
@@ -90,7 +90,7 @@ def validate_args(args):
|
||||
g = 0;
|
||||
for a in [args.base64, args.base32, args.hex]:
|
||||
if a: g += 1
|
||||
if g > 1: print("The -b, -B and --hex args are mutually exclusive"); exit(1)
|
||||
if g > 1: print("The -b, -B, --hex and --base256 args are mutually exclusive"); exit(1)
|
||||
|
||||
return True
|
||||
|
||||
@@ -137,6 +137,7 @@ def main():
|
||||
parser.add_argument("-b", "--base64", action="store_true", default=False, help="Use base64-encoded input and output")
|
||||
parser.add_argument("-B", "--base32", action="store_true", default=False, help="Use base32-encoded input and output")
|
||||
parser.add_argument("--hex", action="store_true", default=False, help="Use hex-encoded input and output")
|
||||
parser.add_argument("--base256", action="store_true", default=False, help="Use base256-encoded input and output")
|
||||
|
||||
parser.add_argument("--version", action="version", version="rnid {version}".format(version=__version__))
|
||||
|
||||
@@ -391,6 +392,8 @@ def get_rsg_data(rsg):
|
||||
except: pass
|
||||
try: rsg_data = bytes.fromhex(rsg.strip(RSG_PADDING))
|
||||
except: pass
|
||||
try: rsg_data = RNS.b256_to_bytes(rsg.strip(RSG_PADDING.decode("utf-8")))
|
||||
except: pass
|
||||
|
||||
return rsg_data
|
||||
|
||||
@@ -463,9 +466,9 @@ def validate_rsg(rsg, message=None, required_signer=None):
|
||||
return False, signed_data, signing_identity
|
||||
|
||||
def create_rsg(signer_identity, message, note=None, meta=None, output="bin"):
|
||||
if not output in ["bin", "hex", "base32", "base64"]: raise TypeError(f"Invalid output format for rsg creation")
|
||||
if not type(signer_identity) == RNS.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity")
|
||||
if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key")
|
||||
if not output in ["bin", "hex", "base32", "base256", "base64"]: raise TypeError(f"Invalid output format for rsg creation")
|
||||
if not type(signer_identity) == RNS.Identity: raise TypeError(f"{signer_identity} is not a Reticulum Identity")
|
||||
if not signer_identity.get_private_key(): raise ValueError(f"{signer_identity} does not hold a private key")
|
||||
|
||||
signed_data = { "hashtype": "sha256", "hash": get_rsg_hash(message),
|
||||
"meta": { "signer": signer_identity.hash,
|
||||
@@ -480,11 +483,12 @@ def create_rsg(signer_identity, message, note=None, meta=None, output="bin"):
|
||||
signature = signer_identity.sign(envelope)
|
||||
rsg_data = signature+envelope
|
||||
|
||||
if output == "bin": rsg = rsg_data
|
||||
elif output == "hex": rsg = RNS.hexrep(rsg_data, delimit=False).encode("ascii")
|
||||
elif output == "base32": rsg = base64.b32encode(rsg_data)
|
||||
elif output == "base64": rsg = base64.urlsafe_b64encode(rsg_data)
|
||||
else: return None
|
||||
if output == "bin": rsg = rsg_data
|
||||
elif output == "hex": rsg = RNS.hexrep(rsg_data, delimit=False).encode("ascii")
|
||||
elif output == "base32": rsg = base64.b32encode(rsg_data)
|
||||
elif output == "base64": rsg = base64.urlsafe_b64encode(rsg_data)
|
||||
elif output == "base256": rsg = RNS.b256rep(rsg_data)
|
||||
else: return None
|
||||
|
||||
return rsg
|
||||
|
||||
@@ -493,6 +497,7 @@ RSG_ASCII_FOOTER = b" End of rsg data ####"
|
||||
RSG_ASCII_ROW_WIDTH = 64
|
||||
RSG_PADDING = b"="
|
||||
def wrap_rsg(rsg):
|
||||
if type(rsg) == str: return wrap_rsg_str(rsg)
|
||||
def pad(chunk): return chunk+(RSG_ASCII_ROW_WIDTH-len(chunk))*RSG_PADDING
|
||||
header = RSG_ASCII_HEADER+b"#"*(RSG_ASCII_ROW_WIDTH-len(RSG_ASCII_HEADER))
|
||||
footer = b"#"*(RSG_ASCII_ROW_WIDTH-len(RSG_ASCII_FOOTER))+RSG_ASCII_FOOTER
|
||||
@@ -507,6 +512,21 @@ def wrap_rsg(rsg):
|
||||
wrapped += footer
|
||||
return wrapped.decode("ascii")
|
||||
|
||||
def wrap_rsg_str(rsg):
|
||||
def pad(chunk): return chunk+(RSG_ASCII_ROW_WIDTH-len(chunk))*RSG_PADDING.decode("utf-8")
|
||||
header = RSG_ASCII_HEADER.decode("utf-8")+"#"*(RSG_ASCII_ROW_WIDTH-len(RSG_ASCII_HEADER.decode("utf-8")))
|
||||
footer = "#"*(RSG_ASCII_ROW_WIDTH-len(RSG_ASCII_FOOTER.decode("utf-8")))+RSG_ASCII_FOOTER.decode("utf-8")
|
||||
wrapped = header+"\n"
|
||||
read = 0
|
||||
while len(rsg):
|
||||
chunk = rsg[:RSG_ASCII_ROW_WIDTH]
|
||||
if len(chunk) < RSG_ASCII_ROW_WIDTH: chunk = pad(chunk)
|
||||
wrapped += chunk+"\n"; read += len(chunk)
|
||||
rsg = rsg[len(chunk):]
|
||||
|
||||
wrapped += footer
|
||||
return wrapped
|
||||
|
||||
def unwrap_rsg(wrapped_rsg):
|
||||
unwrapped = ""
|
||||
if type(wrapped_rsg) == bytes: wrapped_rsg = wrapped_rsg.decode("ascii")
|
||||
@@ -584,10 +604,11 @@ def sign(args, identity):
|
||||
file_exists = os.path.isfile(sign_path)
|
||||
signature_exists = os.path.isfile(rsg_path)
|
||||
|
||||
if args.base32: output = "base32"
|
||||
elif args.base64: output = "base64"
|
||||
elif args.hex: output = "hex"
|
||||
else: output = "bin"
|
||||
if args.base32: output = "base32"
|
||||
elif args.base64: output = "base64"
|
||||
elif args.base256: output = "base256"
|
||||
elif args.hex: output = "hex"
|
||||
else: output = "bin"
|
||||
|
||||
if not identity.get_private_key(): print(f"Cannot sign \"{sign_path}\", the identity does not hold a private key"); exit(R_NO_PRVKEY)
|
||||
if not file_exists: print(f"The file \"{sign_path}\" does not exist"); exit(R_NO_FILE)
|
||||
@@ -606,7 +627,7 @@ def sign(args, identity):
|
||||
if output == "bin":
|
||||
with open(rsg_path, "wb") as out_file: out_file.write(rsg)
|
||||
|
||||
elif output == "base32" or output == "base64" or output == "hex": print(f"\n{wrap_rsg(rsg)}\n")
|
||||
elif output in ["base32", "base64", "base256", "hex"]: print(f"\n{wrap_rsg(rsg)}\n")
|
||||
else: print("No valid output format specified")
|
||||
|
||||
print(f"Signed file {sign_path} with {identity}"); exit(R_OK)
|
||||
|
||||
+26
-6
@@ -198,11 +198,6 @@ def prettyhexrep(data):
|
||||
hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">"
|
||||
return hexrep
|
||||
|
||||
def prettyb256rep(data):
|
||||
delimiter = ""
|
||||
b256rep = "<"+delimiter.join(b256_rep(c) for c in data)+">"
|
||||
return b256rep
|
||||
|
||||
def prettyspeed(num, suffix="b"):
|
||||
return prettysize(num/8, suffix=suffix)+"ps"
|
||||
|
||||
@@ -555,6 +550,8 @@ class Profiler:
|
||||
|
||||
profile = Profiler.get_profiler
|
||||
|
||||
# The base-256 table is likely to change. Currently, it is just
|
||||
# experimental, so don't count on it too much just yet.
|
||||
b256 = [
|
||||
# 0 1 2 3 4 5 6 7 8 9 A B C D F F
|
||||
"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p", # 0x0 Latin & numerals
|
||||
@@ -575,4 +572,27 @@ b256 = [
|
||||
"𐌳","𐌸","𐌾","𐐀","𐐁","𐐂","𐐆","𐐇","𐐈","𐐉","𐐊","𐐋","𐐌","𐐍","𐐎","𐐏", # 0xF Gothic & Deseret
|
||||
]
|
||||
|
||||
def b256_rep(input_byte): return b256[int(input_byte)]
|
||||
def b256rep(data): return "".join(bytes_to_b256(data))
|
||||
def prettyb256rep(data): return f"<{b256rep(data)}>"
|
||||
|
||||
def b256_to_byte(point):
|
||||
if not type(point) == str or not len(point) == 1: raise TypeError("Invalid input data for base256 byte decode")
|
||||
try: return b256.index(point)
|
||||
except Exception as e: raise ValueError(f"Could not decode base256 byte: {e}")
|
||||
|
||||
def b256_to_bytes(b256rep):
|
||||
if not type(b256rep) == str: raise TypeError("Invalid input data for base256 decode")
|
||||
try: return bytes([b256.index(c) for c in b256rep])
|
||||
except Exception as e: raise ValueError(f"Could not decode base256: {e}")
|
||||
|
||||
def byte_to_b256(input_byte):
|
||||
if type(input_byte) == bytes and not len(input_byte) == 1: TypeError("Invalid input data for base256 byte encode")
|
||||
if type(input_byte) == bytes and len(input_byte) == 1: input_byte = ord(input_byte)
|
||||
if not type(input_byte) == int: raise TypeError("Invalid input data for base256 byte encode")
|
||||
try: return b256[int(input_byte)]
|
||||
except Exception as e: raise TypeError(f"Could not encode byte to base256: {e}")
|
||||
|
||||
def bytes_to_b256(data):
|
||||
if not type(data) == bytes: raise TypeError("Invalid input data for base256 encode")
|
||||
try: return [byte_to_b256(c) for c in data]
|
||||
except Exception as e: raise TypeError(f"Could not encode to base256: {e}")
|
||||
|
||||
Reference in New Issue
Block a user