Added work document permissions control logic and CLI interaction to rngit. Added ability to create comments/updates on work documents from allowed identities.

This commit is contained in:
Mark Qvist
2026-05-07 00:42:37 +02:00
parent d1c59ef3b6
commit 8b2ba9907f
+233 -2
View File
@@ -1009,6 +1009,61 @@ class ReticulumGitClient():
finally:
if self.link: self.link.teardown()
def work_permissions(self, remote=None, doc_id=None):
if not remote: print(f"No remote specified"); exit(1)
if doc_id is None: print(f"No document ID specified"); exit(1)
self.connect_remote(remote)
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(0.2)
timeout -= 1
if not self.link_ready: self.abort("Failed to establish link")
print("\r \r", end="")
try:
destination_hash, group, repo = self.parse_remote_url(remote)
repo_path = f"{group}/{repo}"
request_data = {self.IDX_REPOSITORY: repo_path,
"operation": "perms", "doc_id": doc_id, "step": "get"}
response, metadata = self.send_request(self.PATH_WORK, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
if len(response) > 1:
result = mp.unpackb(response[1:])
current_content = result.get("content", "")
else: current_content = ""
content = self._edit_permissions(doc_id=doc_id, content=current_content)
if content is None: print("Edit cancelled"); return
request_data = {self.IDX_REPOSITORY: repo_path,
"operation": "perms", "doc_id": doc_id, "step": "set",
"content": content}
response, metadata = self.send_request(self.PATH_WORK, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
print(f"Permissions updated for work document #{doc_id}")
except Exception as e: self.abort(f"Error editing permissions: {e}")
finally:
if self.link: self.link.teardown()
def _edit_work_content(self, title="", content="", is_comment=False):
editor = os.environ.get("EDITOR", "")
if not editor:
@@ -1054,6 +1109,42 @@ class ReticulumGitClient():
RNS.log(f"Error editing work content: {e}", RNS.LOG_ERROR)
return None
def _edit_permissions(self, doc_id=None, content=""):
editor = os.environ.get("EDITOR", "")
if not editor:
for fallback in ["nano", "vim", "vi"]:
try:
subprocess.run(["which", fallback], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
editor = fallback
break
except subprocess.CalledProcessError: continue
if not editor:
print("No editor found. Please set $EDITOR environment variable.")
return None
if content: template = content
else: template = DOC_PERMISSIONS_TEMPLATE
try:
with NamedTemporaryFile(mode="w+", suffix=".txt", delete=False) as tmp:
tmp_path = tmp.name
tmp.write(template)
result = subprocess.run([editor, tmp_path])
if result.returncode != 0:
print(f"Editor exited with error code {result.returncode}")
os.unlink(tmp_path)
return None
with open(tmp_path, "r") as f: edited = f.read()
os.unlink(tmp_path)
return edited
except Exception as e:
RNS.log(f"Error editing permissions: {e}", RNS.LOG_ERROR)
return None
class ReticulumGitNode():
JOBS_INTERVAL = 5
@@ -1383,10 +1474,10 @@ class ReticulumGitNode():
allowed_input = None
if os.path.isdir(work_path) and os.path.isfile(doc_allowed_path):
try:
with open(doc_allowed_path, "r") as fh: allowed_input = f.read()
with open(doc_allowed_path, "r") as fh: allowed_input = fh.read()
except Exception as e: RNS.log(f"Error while resolving document permission for {group_name}/{repository_name}/{doc_id}: {e}", RNS.LOG_ERROR)
doc_allowed_permissions = permissions_from_allowed_input(allowed_input)
doc_allowed_permissions = self.permissions_from_allowed_input(allowed_input)
if permission == self.PERM_READ:
repository_permissions = self.groups[group_name]["repositories"][repository_name]["read"]
@@ -2189,13 +2280,26 @@ class ReticulumGitNode():
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if operation in ["comment"]:
if data.get("doc_id", None):
try: doc_id = int(data.get("doc_id", None))
except: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
interact_access |= self.resolve_doc_permission(remote_identity, group_name, repository_name, doc_id, self.PERM_INTERACT)
comment_access = interact_access and (read_access or write_access)
manage_access = interact_access and write_access
RNS.log(f"hash : {RNS.prettyhexrep(remote_identity.hash)}")
RNS.log(f"read : {read_access}")
RNS.log(f"wite : {write_access}")
RNS.log(f"interact : {interact_access}")
RNS.log(f"manage : {manage_access}")
if operation in ["list", "view"] and read_access: access = True
elif operation in ["comment"] and comment_access: access = True
elif operation in ["create", "edit", "delete"] and manage_access: access = True
elif operation in ["complete", "activate"] and manage_access: access = True
elif operation in ["perms"] and manage_access: access = True
else: access = False
if not access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
@@ -2212,6 +2316,7 @@ class ReticulumGitNode():
elif operation == "delete" and manage_access: return self._work_delete(work_path, data, remote_identity)
elif operation == "complete" and manage_access: return self._work_complete(work_path, data, remote_identity)
elif operation == "activate" and manage_access: return self._work_activate(work_path, data, remote_identity)
elif operation == "perms" and manage_access: return self._work_perms(work_path, data, remote_identity)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
except Exception as e:
@@ -2533,6 +2638,132 @@ class ReticulumGitNode():
RNS.log(f"Error activating work document: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _work_perms(self, work_path, data, remote_identity):
step = data.get("step")
group_name, repository_name = self.parse_request_repository_path(data[self.IDX_REPOSITORY])
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
write_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_WRITE)
interact_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_INTERACT)
manage_access = interact_access and write_access
if not read_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not manage_access: return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
if not step: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if step == "get": return self._work_get_permissions(work_path, data, remote_identity, group_name, repository_name)
elif step == "set": return self._work_set_permissions(work_path, data, remote_identity, group_name, repository_name)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid step"
def _work_get_permissions(self, work_path, data, remote_identity, group_name, repository_name):
doc_id = data.get("doc_id")
if doc_id is None: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No document ID specified"
try: doc_id = int(doc_id)
except: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid document ID"
scope = None
doc_dir = None
for s in ["active", "completed"]:
d = os.path.join(work_path, s, str(doc_id))
if os.path.isdir(d):
scope = s
doc_dir = d
break
if not doc_dir: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Document not found"
root_path = os.path.join(doc_dir, "root")
doc = self._work_load_document(root_path)
if not doc: return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error loading document"
is_author = doc.get("meta", {}).get("author") == remote_identity.hash
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
write_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_WRITE)
interact_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_INTERACT)
admin_access = self.resolve_doc_permission(remote_identity, group_name, repository_name, doc_id, self.PERM_ADMIN)
manage_access = interact_access and write_access
if not ((is_author and manage_access) or admin_access): return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
try:
allowed_path = work_path + f"/{doc_id}.allowed"
if os.path.isfile(allowed_path):
with open(allowed_path, "r", encoding="utf-8") as f: content = f.read()
else: content = ""
return b"\x00" + mp.packb({"content": content})
except Exception as e:
RNS.log(f"Error getting document permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + f"Error getting permissions: {e}".encode("utf-8")
def _work_set_permissions(self, work_path, data, remote_identity, group_name, repository_name):
doc_id = data.get("doc_id")
content = data.get("content", "")
if doc_id is None: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No document ID specified"
try: doc_id = int(doc_id)
except: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid document ID"
scope = None
doc_dir = None
for s in ["active", "completed"]:
d = os.path.join(work_path, s, str(doc_id))
if os.path.isdir(d):
scope = s
doc_dir = d
break
if not doc_dir: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Document not found"
root_path = os.path.join(doc_dir, "root")
doc = self._work_load_document(root_path)
if not doc: return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error loading document"
is_author = doc.get("meta", {}).get("author") == remote_identity.hash
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
write_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_WRITE)
interact_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_INTERACT)
admin_access = self.resolve_doc_permission(remote_identity, group_name, repository_name, doc_id, self.PERM_ADMIN)
manage_access = interact_access and write_access
RNS.log(f"author : {is_author}")
RNS.log(f"read : {read_access}")
RNS.log(f"wite : {write_access}")
RNS.log(f"interact : {interact_access}")
RNS.log(f"admin : {admin_access}")
RNS.log(f"manage : {manage_access}")
if not ((is_author and manage_access) or admin_access): return self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
valid = True
error_line = None
invalid_perm = ""
for line_num, line in enumerate(content.splitlines(), 1):
stripped = line.strip()
if not stripped or stripped.startswith("#"): continue
perm, target = self.parse_permission(stripped)
if perm is None or target is None:
valid = False
error_line = line_num
invalid_perm = stripped
break
if not valid: return self.RES_INVALID_REQ.to_bytes(1, "big") + f"Invalid permission \"{invalid_perm}\" on line {error_line}".encode("utf-8")
try:
allowed_path = work_path + f"/{doc_id}.allowed"
tmp_path = allowed_path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f: f.write(content)
os.rename(tmp_path, allowed_path)
RNS.log(f"Permissions for work document {group_name}/{repository_name}/{doc_id} updated by {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error setting permissions: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + f"Error setting permissions: {e}".encode("utf-8")
def repository_stats(self, remote_identity, group_name, repository_name, lookback_days=14):
if not self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_STATS): return None
else: