Added fork and mirror indications to rngit page node

This commit is contained in:
Mark Qvist
2026-05-16 21:14:02 +02:00
parent 038981474a
commit 0c68f6491a
2 changed files with 67 additions and 1 deletions
+35
View File
@@ -314,6 +314,16 @@ class NomadNetworkNode():
field_str = "`" + "|".join(field_parts)
return f"`[{sanitize_label(_label)}`:{_path}{field_str}]"
def m_link_e(self, _label, remote, _path, **fields):
def sanitize_v(value): return urllib.parse.quote_plus(str(value).encode("utf-8"))
def sanitize_label(value): return value.replace("[", "").replace("]", "").replace("`", "")
field_str = ""
if fields:
field_parts = []
for k, v in fields.items(): field_parts.append(f"{k}={sanitize_v(v)}")
field_str = "`" + "|".join(field_parts)
return f"`!`[{sanitize_label(_label)}`{remote}:{_path}{field_str}]`!"
def m_link(self, _label, _path, **fields):
def sanitize_v(value): return urllib.parse.quote_plus(str(value).encode("utf-8"))
def sanitize_label(value): return value.replace("[", "").replace("]", "").replace("`", "")
@@ -427,6 +437,31 @@ class NomadNetworkNode():
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
repo_source = ""; source_link = None
if repo["fork"] or repo["mirror"]:
if repo["fork"]: source_type = "fork"; source_url = repo["fork"]
elif repo["mirror"]: source_type = "mirror"; source_url = repo["mirror"]
else: source_type = "retriev"; source_url = "unknown source"
if not source_url.lower().startswith("rns://"): source_link = ""
else:
try:
url_components = source_url.split("/")
if len(url_components) == 5 and len(url_components[2]) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2:
source_repo_dest = bytes.fromhex(url_components[2])
source_group_name = url_components[3]
source_repo_name = url_components[4]
source_identity = RNS.Identity.recall(source_repo_dest)
if source_identity:
source_page_dest = RNS.Destination.hash_from_name_and_identity("nomadnetwork.node", source_identity)
mu_link = self.m_link_e(source_url, RNS.hexrep(source_page_dest, delimit=False), self.PATH_REPO, g=source_group_name, r=source_repo_name)
source_link = f"{mu_link}"
except Exception as e: source_link = ""
source_desc = f"{source_type}ed from"
source_indent = " "*(len(f"Node / {group_name} / {repo_name}")-len(source_desc))
if source_link: source_url = source_link
nav_parts.append(f"{self.CLR_DIM}{source_desc.capitalize()}{source_indent} {source_url}`f\n")
if not repo:
content = self.m_heading("Not Found", 1) + "\nThe requested repository was not found.\n"
return self.render_template(content, nav_content="".join(nav_parts), st=st)
+32 -1
View File
@@ -1972,8 +1972,11 @@ class ReticulumGitNode():
allowed_input = fh.read().decode("utf-8")
fh.close()
fork = self.__is_fork(path)
mirror = self.__is_mirror(path)
p = self.permissions_from_allowed_input(allowed_input)
group["repositories"][repository_name] = { "name": repository_name, "group": group_name, "path": path }
group["repositories"][repository_name] = { "name": repository_name, "group": group_name, "path": path, "fork": fork, "mirror": mirror }
for perm in self.ALL_PERMS: group["repositories"][repository_name][perm] = p[perm] if perm in p else []
return True
@@ -2054,6 +2057,34 @@ class ReticulumGitNode():
except: return False
def __is_fork(self, path):
try:
result = subprocess.run(["git", "config", "repository.rngit.type"], cwd=path, check=True, capture_output=True, text=True)
if not result: return False
else: check = result.stdout.strip()
if not check == "fork": return False
else:
result = subprocess.run(["git", "config", "repository.rngit.upstream.source"], cwd=path, check=True, capture_output=True, text=True)
if not result: return False
else: source = result.stdout.strip()
return source
except: return False
def __is_mirror(self, path):
try:
result = subprocess.run(["git", "config", "repository.rngit.type"], cwd=path, check=True, capture_output=True, text=True)
if not result: return False
else: check = result.stdout.strip()
if not check == "mirror": return False
else:
result = subprocess.run(["git", "config", "repository.rngit.upstream.source"], cwd=path, check=True, capture_output=True, text=True)
if not result: return False
else: source = result.stdout.strip()
return source
except: return False
def register_request_handlers(self):
ga_list = self.global_allowed_list if self.global_allow == RNS.Destination.ALLOW_LIST else None
self.destination.register_request_handler(self.PATH_LIST, self.handle_list, allow=self.global_allow, allowed_list=ga_list)