Implemented remote permissions management in rngit

This commit is contained in:
Mark Qvist
2026-05-17 16:49:44 +02:00
parent 10156cc90e
commit 128455ef01
+387 -10
View File
@@ -89,6 +89,12 @@ def program_setup(configdir, rnsconfigdir=None, verbosity=0, quietness=0, servic
elif operation == "latest": git_client.latest_release(remote=task["remote"], target=task["target"])
else: print("Invalid operation"); exit(1)
elif command == "perms":
git_client = ReticulumGitClient(configdir=configdir, verbosity=targetverbosity, identitypath=identity)
if operation == "gperms": git_client.group_permissions(remote=task["remote"])
elif operation == "rperms": git_client.repository_permissions(remote=task["remote"])
else: print("Invalid operation"); exit(1)
elif command == "work":
git_client = ReticulumGitClient(configdir=configdir, verbosity=targetverbosity, identitypath=identity)
scope = task.get("scope", "active")
@@ -124,7 +130,7 @@ def program_setup(configdir, rnsconfigdir=None, verbosity=0, quietness=0, servic
exit(0)
def main():
subcommands = ["node", "release", "work", "create", "fork", "sync", "mirror"]
subcommands = ["node", "release", "perms", "work", "create", "fork", "sync", "mirror"]
try:
if len(sys.argv) < 2 or sys.argv[1] not in subcommands: subcommand = "node"
else: subcommand = sys.argv[1]; sys.argv.pop(1)
@@ -154,6 +160,13 @@ def main():
parser.add_argument("operation", nargs="?", default=None, help="list, view, create, latest or delete", type=str)
parser.add_argument("target", nargs="?", default=None, help="tag and path to release artifacts directory", type=str)
elif subcommand == "perms":
parser = argparse.ArgumentParser(description="Reticulum Git Release Manager")
parser.add_argument("--config", action="store", default=None, help="path to alternative config directory", type=str)
parser.add_argument("--rnsconfig", action="store", default=None, help="path to alternative Reticulum config directory", type=str)
parser.add_argument("-i", "--identity", action="store", metavar="PATH", default=None, help="path to release identity", type=str)
parser.add_argument("remote", default=None, help="URL of remote group or repository", type=str)
elif subcommand == "work":
parser = argparse.ArgumentParser(description="Reticulum Git Work Document Manager")
parser.add_argument("--config", action="store", default=None, help="path to alternative config directory", type=str)
@@ -216,8 +229,18 @@ def main():
program_setup(configdir=configarg, rnsconfigdir=rnsconfigarg, service=False, verbosity=args.verbose,
quietness=args.quiet, interactive=False, print_identity=False, task=task, identity=args.identity)
elif subcommand == "perms":
args.remote = args.remote.rstrip("/")
url_components_len = len(args.remote.split("/"))
if url_components_len == 5: operation = "rperms"
elif url_components_len == 4: operation = "gperms"
else: parser.print_help(); print("\nInvalid URL"); exit(1)
task = {"command": subcommand, "operation": operation, "remote": args.remote}
program_setup(configdir=configarg, rnsconfigdir=rnsconfigarg, service=False, verbosity=args.verbose,
quietness=args.quiet, interactive=False, print_identity=False, task=task, identity=args.identity)
elif subcommand == "work":
if not args.operation: parser.print_help()
if not args.operation: parser.print_help(); print()
task = {"command": subcommand, "operation": args.operation, "remote": args.repository,
"scope": args.scope, "doc_id": args.id, "title": args.title}
program_setup(configdir=configarg, rnsconfigdir=rnsconfigarg, service=False, verbosity=args.verbose,
@@ -255,6 +278,7 @@ class ReticulumGitClient():
PATH_MIRROR = "/git/mirror"
PATH_RELEASE = "/mgmt/release"
PATH_WORK = "/mgmt/work"
PATH_PERMS = "/mgmt/perms"
RES_OK = 0x00
RES_DISALLOWED = 0x01
@@ -264,6 +288,7 @@ class ReticulumGitClient():
IDX_REPOSITORY = 0x00
IDX_RESULT_CODE = 0x01
IDX_GROUP = 0x02
PATH_TIMEOUT = 15
LINK_TIMEOUT = 15
@@ -382,8 +407,28 @@ class ReticulumGitClient():
except Exception as e: self.abort(f"Invalid destination hash: {e}")
return destination_hash, components[1], components[2]
def parse_remote_group_url(self, remote):
if not remote.lower().startswith(self.PROTO_SPEC): self.abort("Invalid protocol in remote URL")
components = remote[len(self.PROTO_SPEC):].split("/")
destination_hexhash = self.__resolve_destination_alias(components[0])
if not len(components) == 2: self.abort("Invalid number of URL components")
if not len(destination_hexhash) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2: self.abort("Invalid destination hash length")
try: destination_hash = bytes.fromhex(destination_hexhash)
except Exception as e: self.abort(f"Invalid destination hash: {e}")
return destination_hash, components[1]
def parse_remote_destination_url(self, remote):
if not remote.lower().startswith(self.PROTO_SPEC): self.abort("Invalid protocol in remote URL")
components = remote[len(self.PROTO_SPEC):].split("/")
destination_hexhash = self.__resolve_destination_alias(components[0])
if not len(components): self.abort("Invalid number of URL components")
if not len(destination_hexhash) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2: self.abort("Invalid destination hash length")
try: destination_hash = bytes.fromhex(destination_hexhash)
except Exception as e: self.abort(f"Invalid destination hash: {e}")
return destination_hash
def connect_remote(self, remote):
destination_hash, group, repo = self.parse_remote_url(remote)
destination_hash = self.parse_remote_destination_url(remote)
print(f"Requesting path... ", end="")
if not RNS.Transport.await_path(destination_hash, timeout=self.path_timeout):
print(f"\n", end="")
@@ -965,6 +1010,112 @@ class ReticulumGitClient():
finally:
if self.link: self.link.teardown()
##########################
# Permissions Management #
##########################
def group_permissions(self, remote=None):
if not remote: print(f"No remote specified"); exit(1)
self.connect_remote(remote)
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(self.wait_sleep)
timeout -= self.wait_sleep
if not self.link_ready: self.abort("Failed to establish link")
print("\r \r", end="")
try:
destination_hash, group = self.parse_remote_group_url(remote)
request_data = {self.IDX_GROUP: group, "operation": "gperms", "step": "get"}
response, metadata = self.send_request(self.PATH_PERMS, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
if len(response) > 1:
result = mp.unpackb(response[1:])
current_content = result.get("content", "")
else: current_content = ""
content = self._edit_permissions(content=current_content)
if content is None: print("Edit cancelled"); return
request_data = {self.IDX_GROUP: group, "operation": "gperms", "step": "set", "content": content}
response, metadata = self.send_request(self.PATH_PERMS, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
print(f"Permissions updated for group {group}")
except Exception as e: self.abort(f"Error editing permissions: {e}")
finally:
if self.link: self.link.teardown()
def repository_permissions(self, remote=None):
if not remote: print(f"No remote specified"); exit(1)
self.connect_remote(remote)
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(self.wait_sleep)
timeout -= self.wait_sleep
if not self.link_ready: self.abort("Failed to establish link")
print("\r \r", end="")
try:
destination_hash, group, repo = self.parse_remote_url(remote)
repo_path = f"{group}/{repo}"
request_data = {self.IDX_REPOSITORY: repo_path, "operation": "rperms", "step": "get"}
response, metadata = self.send_request(self.PATH_PERMS, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
if len(response) > 1:
result = mp.unpackb(response[1:])
current_content = result.get("content", "")
else: current_content = ""
content = self._edit_permissions(content=current_content)
if content is None: print("Edit cancelled"); return
request_data = {self.IDX_REPOSITORY: repo_path, "operation": "rperms", "step": "set", "content": content}
response, metadata = self.send_request(self.PATH_PERMS, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
print(f"Permissions updated for {repo_path}")
except Exception as e: self.abort(f"Error editing permissions: {e}")
finally:
if self.link: self.link.teardown()
########################
# Work Docs Management #
########################
@@ -1440,7 +1591,7 @@ class ReticulumGitClient():
else: current_content = ""
content = self._edit_permissions(doc_id=doc_id, content=current_content)
content = self._edit_permissions(content=current_content)
if content is None: print("Edit cancelled"); return
request_data = {self.IDX_REPOSITORY: repo_path,
@@ -1461,6 +1612,11 @@ class ReticulumGitClient():
finally:
if self.link: self.link.teardown()
##################
# Editor Helpers #
##################
def _edit_work_content(self, title="", content="", is_comment=False):
editor = os.environ.get("EDITOR", "")
if not editor:
@@ -1506,7 +1662,7 @@ class ReticulumGitClient():
RNS.log(f"Error editing work content: {e}", RNS.LOG_ERROR)
return None
def _edit_permissions(self, doc_id=None, content=""):
def _edit_permissions(self, content=""):
editor = os.environ.get("EDITOR", "")
if not editor:
for fallback in ["nano", "vim", "vi"]:
@@ -1521,7 +1677,7 @@ class ReticulumGitClient():
return None
if content: template = content
else: template = DOC_PERMISSIONS_TEMPLATE
else: template = PERMISSIONS_TEMPLATE
try:
with NamedTemporaryFile(mode="w+", suffix=".txt", delete=False) as tmp:
@@ -1581,6 +1737,7 @@ class ReticulumGitNode():
PATH_MIRROR = "/git/mirror"
PATH_RELEASE = "/mgmt/release"
PATH_WORK = "/mgmt/work"
PATH_PERMS = "/mgmt/perms"
RES_OK = 0x00
RES_DISALLOWED = 0x01
@@ -1590,6 +1747,7 @@ class ReticulumGitNode():
IDX_REPOSITORY = 0x00
IDX_RESULT_CODE = 0x01
IDX_GROUP = 0x02
WORK_DOC_LIMIT = 256*1024
@@ -1887,6 +2045,15 @@ class ReticulumGitNode():
if len(group) > limit or len(repository_name) > limit: return None, None
else: return group, repository_name
def parse_request_group_path(self, path):
components = path.split("/")
if not len(components) == 1: return None
else:
limit = 256
group = components[0]
if len(group) > limit: return None
else: return group
def resolve_permission(self, remote_identity, group_name, repository_name, permission):
remote_hash = remote_identity.hash
RNS.log(f"Resolving {group_name}/{repository_name} permission {permission} for {RNS.prettyhexrep(remote_hash)}", RNS.LOG_DEBUG)
@@ -2260,6 +2427,11 @@ class ReticulumGitNode():
except Exception as e: RNS.log(f"Error while running periodic jobs: {e}", RNS.LOG_ERROR)
##################
# System Helpers #
##################
@staticmethod
def _ensure_git():
try: subprocess.run(["git", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); return True
@@ -2332,12 +2504,18 @@ class ReticulumGitNode():
except: return False
######################
# Connectivity Setup #
######################
def register_request_handlers(self):
ga_list = self.global_allowed_list if self.global_allow == RNS.Destination.ALLOW_LIST else None
self.destination.register_request_handler(self.PATH_LIST, self.handle_list, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_FETCH, self.handle_fetch, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_PUSH, self.handle_push, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_CREATE, self.handle_create, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_PERMS, self.handle_perms, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_FORK, self.handle_fork, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_SYNC, self.handle_sync, allow=self.global_allow, allowed_list=ga_list)
self.destination.register_request_handler(self.PATH_MIRROR, self.handle_mirror, allow=self.global_allow, allowed_list=ga_list)
@@ -2357,6 +2535,10 @@ class ReticulumGitNode():
self.active_links[link.link_id] = link
RNS.log(f"Peer identified as {link.get_remote_identity()} on {link}", RNS.LOG_DEBUG)
###########################
# Git Operations Handlers #
###########################
def handle_list(self, path, data, request_id, remote_identity, requested_at):
RNS.log(f"List request from remote {remote_identity}", RNS.LOG_DEBUG)
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
@@ -2622,6 +2804,11 @@ class ReticulumGitNode():
RNS.log(f"Error while handling delete request for {group_name}/{repository_name}: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Remote error"
##################################
# Repository Management Handlers #
##################################
def handle_create(self, path, data, request_id, remote_identity, requested_at):
RNS.log(f"Create request from remote {remote_identity}", RNS.LOG_DEBUG)
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
@@ -2840,6 +3027,11 @@ class ReticulumGitNode():
RNS.log(f"Error while handling sync request for {group_name}/{repository_name}: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Remote error"
###############################
# Release Management Handlers #
###############################
def handle_release(self, path, data, request_id, remote_identity, requested_at):
RNS.log(f"Release request from remote {remote_identity}", RNS.LOG_DEBUG)
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
@@ -3224,9 +3416,9 @@ class ReticulumGitNode():
RNS.log(f"Error setting latest release for {releases_path}: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Remote error"
#########################
# Work Document Methods #
#########################
##########################
# Work Document Handlers #
##########################
def handle_work(self, path, data, request_id, remote_identity, requested_at):
RNS.log(f"Work request from remote {remote_identity}", RNS.LOG_DEBUG)
@@ -3862,6 +4054,191 @@ class ReticulumGitNode():
RNS.log(f"Error setting permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error setting permissions"
##################################
# Permission Management Handlers #
##################################
def handle_perms(self, path, data, request_id, remote_identity, requested_at):
RNS.log(f"Permissions request from remote {remote_identity}", RNS.LOG_DEBUG)
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
try:
operation = data.get("operation")
if not operation: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if operation == "gperms":
if not self.IDX_GROUP in data: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No group specified"
group_name = self.parse_request_group_path(data[self.IDX_GROUP])
read_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_READ)
admin_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_ADMIN)
return self._group_permissions(group_name, data, remote_identity)
elif operation == "rperms":
if not self.IDX_REPOSITORY in data: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No repository specified"
group_name, repository_name = self.parse_request_repository_path(data[self.IDX_REPOSITORY])
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
admin_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_ADMIN)
if not admin_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found" if not read_access else self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
return self._repository_permissions(group_name, repository_name, data, remote_identity)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
except Exception as e:
RNS.log(f"Error while handling permissions request from {remote_identity}: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Remote error"
def _group_permissions(self, group_name, data, remote_identity):
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
read_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_READ)
admin_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_ADMIN)
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not admin_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
step = data.get("step")
if not step: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if step == "get": return self._group_get_permissions(group_name, data, remote_identity)
elif step == "set": return self._group_set_permissions(group_name, data, remote_identity)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid step"
def _group_get_permissions(self, group_name, data, remote_identity):
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
read_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_READ)
admin_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_ADMIN)
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not admin_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
try:
group_path = self.groups[group_name]["path"]
allowed_path = group_path + ".allowed"
if os.path.isfile(allowed_path):
with open(allowed_path, "r", encoding="utf-8") as f: content = f.read()
else: content = ""
return b"\x00" + mp.packb({"content": content})
except Exception as e:
RNS.log(f"Error getting group permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error getting permissions"
def _group_set_permissions(self, group_name, data, remote_identity):
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
read_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_READ)
admin_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_ADMIN)
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not admin_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
content = data.get("content", "")
valid = True
error_line = None
invalid_perm = ""
for line_num, line in enumerate(content.splitlines(), 1):
stripped = line.strip()
if not stripped or stripped.startswith("#"): continue
perm, target = self.parse_permission(stripped)
if not perm or not target:
valid = False
error_line = line_num
invalid_perm = stripped
break
if not valid: return self.RES_INVALID_REQ.to_bytes(1, "big") + f"Invalid permission \"{invalid_perm}\" on line {error_line}".encode("utf-8")
try:
group_path = self.groups[group_name]["path"]
allowed_path = group_path + ".allowed"
tmp_path = allowed_path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f: f.write(content)
os.rename(tmp_path, allowed_path)
RNS.log(f"Permissions for group {group_name} updated by {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error setting permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error setting permissions"
def _repository_permissions(self, group_name, repository_name, data, remote_identity):
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
admin_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_ADMIN)
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not admin_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
step = data.get("step")
if not step: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if step == "get": return self._repository_get_permissions(group_name, repository_name, data, remote_identity)
elif step == "set": return self._repository_set_permissions(group_name, repository_name, data, remote_identity)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid step"
def _repository_get_permissions(self, group_name, repository_name, data, remote_identity):
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
admin_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_ADMIN)
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not admin_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
try:
repo_path = self.groups[group_name]["repositories"][repository_name]["path"]
allowed_path = repo_path + ".allowed"
if os.path.isfile(allowed_path):
with open(allowed_path, "r", encoding="utf-8") as f: content = f.read()
else: content = ""
return b"\x00" + mp.packb({"content": content})
except Exception as e:
RNS.log(f"Error getting repository permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error getting permissions"
def _repository_set_permissions(self, group_name, repository_name, data, remote_identity):
if not remote_identity: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not identified"
if not type(data) == dict: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
read_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_READ)
admin_access = self.resolve_group_permission(remote_identity, group_name, self.PERM_ADMIN)
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not admin_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
content = data.get("content", "")
valid = True
error_line = None
invalid_perm = ""
for line_num, line in enumerate(content.splitlines(), 1):
stripped = line.strip()
if not stripped or stripped.startswith("#"): continue
perm, target = self.parse_permission(stripped)
if not perm or not target:
valid = False
error_line = line_num
invalid_perm = stripped
break
if not valid: return self.RES_INVALID_REQ.to_bytes(1, "big") + f"Invalid permission \"{invalid_perm}\" on line {error_line}".encode("utf-8")
try:
repo_path = self.groups[group_name]["repositories"][repository_name]["path"]
allowed_path = repo_path + ".allowed"
tmp_path = allowed_path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f: f.write(content)
os.rename(tmp_path, allowed_path)
RNS.log(f"Permissions for repository {group_name}/{repository_name} updated by {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error setting permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error setting permissions"
###################
# Node Statistics #
###################
@@ -4287,7 +4664,7 @@ RELEASE_NOTES_TEMPLATE = """# Enter release notes for {TAG}.
COMMENT_TEMPLATE = "# Remove this line and enter your update. Save and exit when done, or save an empty document to abort abort."
CREATE_DOC_TEMPLATE = "# Remove this line and enter your document content. Save and exit when done, or save an empty document to abort abort."
DOC_PERMISSIONS_TEMPLATE ="# No permissions are currently defined for this workdoc. Add them below, and save and exit when you are done."
PERMISSIONS_TEMPLATE ="# No permissions are currently defined for this entity. Add them below, and save and exit when you are done."
REPO_CREATE_PERMS_TEMPLATE = "adm:{IDENTITY_HASH}"