From 35d72f27ed0e4d0c0c8d69aed7139ee95ac653f3 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 2 May 2026 01:02:19 +0200 Subject: [PATCH] Added nomadnet page server to rngit --- RNS/Utilities/rngit/pages.py | 1298 ++++++++++++++++++++++++++++++++- RNS/Utilities/rngit/server.py | 3 +- RNS/Utilities/rngit/util.py | 415 +++++++++++ 3 files changed, 1710 insertions(+), 6 deletions(-) create mode 100644 RNS/Utilities/rngit/util.py diff --git a/RNS/Utilities/rngit/pages.py b/RNS/Utilities/rngit/pages.py index e53ebb64..786551f6 100644 --- a/RNS/Utilities/rngit/pages.py +++ b/RNS/Utilities/rngit/pages.py @@ -28,14 +28,44 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import os +import time +import subprocess +import urllib.parse import RNS from RNS.Utilities.rngit import APP_NAME +from RNS.Utilities.rngit.util import MarkdownToMicron from RNS.vendor.configobj import ConfigObj +from RNS._version import __version__ + +DEFAULT_TEMPLATE = """#!c=0 +>{NODE_NAME} + +{PAGE_CONTENT} +< +- +`a`F666`[Served by rngit {VERSION}`:/page/index.mu] - {GEN_TIME}`f""" class NomadNetworkNode(): - APP_NAME = "nomadnetwork" + APP_NAME = "nomadnetwork" + JOBS_INTERVAL = 5 + BASE_TEMPLATE = DEFAULT_TEMPLATE - JOBS_INTERVAL = 5 + PATH_INDEX = "/page/index.mu" + PATH_GROUP = "/page/group.mu" + PATH_REPO = "/page/repo.mu" + PATH_TREE = "/page/tree.mu" + PATH_BLOB = "/page/blob.mu" + PATH_COMMITS = "/page/commits.mu" + PATH_COMMIT = "/page/commit.mu" + PATH_REFS = "/page/refs.mu" + + BLOB_SIZE_LIMIT = 1024 * 1024 + TREE_ENTRIES_PER_PAGE = 100 + COMMITS_PER_PAGE = 20 + SHOW_DIFF_BY_DEFAULT = True + GIT_COMMAND_TIMEOUT = 5 + MAX_RENDER_WIDTH = 100 def __init__(self, owner=None): if not owner: raise TypeError(f"Invalid owner {owner} for {self}") @@ -48,6 +78,7 @@ class NomadNetworkNode(): self.announce_interval = owner.announce_interval self.last_announce = 0 self.null_ident = RNS.Identity.from_bytes(bytes(64)) + self.md_converter = MarkdownToMicron(max_width=self.MAX_RENDER_WIDTH) self.destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, self.APP_NAME, "node") self.destination.set_link_established_callback(self.remote_connected) @@ -81,8 +112,1224 @@ class NomadNetworkNode(): return self.owner.resolve_permission(remote_identity, group_name, repository_name, permission) def register_request_handlers(self): - # TODO: Implement - pass + self.destination.register_request_handler(self.PATH_INDEX, response_generator=self.serve_front_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_GROUP, response_generator=self.serve_group_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_REPO, response_generator=self.serve_repo_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_TREE, response_generator=self.serve_tree_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_BLOB, response_generator=self.serve_blob_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_COMMITS, response_generator=self.serve_commits_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_COMMIT, response_generator=self.serve_commit_page, allow=RNS.Destination.ALLOW_ALL) + self.destination.register_request_handler(self.PATH_REFS, response_generator=self.serve_refs_page, allow=RNS.Destination.ALLOW_ALL) + + def render_template(self, page_content, st=None): + template = self.BASE_TEMPLATE + template = template.replace("{PAGE_CONTENT}", page_content) + template = template.replace("{NODE_NAME}", self.node_name) + template = template.replace("{VERSION}", __version__) + gt = f"Generated in {RNS.prettytime(time.time()-st)}" if st else "Unknown generation time" + template = template.replace("{GEN_TIME}", gt) + return template.encode("utf-8") + + ############################# + # Micron Generation Helpers # + ############################# + + def m_heading(self, text, level=1): return ">" * level + text + "\n" + def m_bold(self, text): return f"`!{text}`!" + def m_italic(self, text): return f"`*{text}`*" + def m_underline(self, text): return f"`_{text}`_" + def m_color_fg(self, text, color): return f"`F{color}{text}`f" + def m_divider(self, char="\u2500"): return f"-{char}\n" + def m_escape(self, text): return text.replace("`", "\\`") + + def m_link(self, _label, _path, **fields): + field_str = "" + if fields: + field_parts = [] + for k, v in fields.items(): field_parts.append(f"{k}={v}") + field_str = "`" + "|".join(field_parts) + return f"`[{_label}`:{_path}{field_str}]" + + def m_align(self, text, align="left"): + align_tag = {"center": "c", "left": "l", "right": "r"}.get(align, "a") + return f"`{align_tag}{text}`a" + + + ######################### + # Page Serving Handlers # + ######################### + + def serve_front_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Front page request from {remote_identity}", RNS.LOG_DEBUG) + + content_parts = [] + content_parts.append(self.m_heading("Git Repositories", 1)) + content_parts.append(f"\n") + + accessible_groups = self.get_accessible_groups(remote_identity) + + if not accessible_groups: content_parts.append("No repository groups available.\n") + else: + content_parts.append(self.m_heading("Available Groups", 2)) + content_parts.append("\n") + + for group_name in sorted(accessible_groups.keys()): + group = accessible_groups[group_name] + repo_count = len(group.get("repositories", {})) + repo_word = "repository" if repo_count == 1 else "repositories" + + link = self.m_link(f"{group_name}", self.PATH_GROUP, g=group_name) + content_parts.append(f" {link} - {repo_count} {repo_word}\n") + + # content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_group_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Group page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + accessible_repos = self.get_accessible_repositories(remote_identity, group_name) + + if not group_name or group_name not in self.owner.groups or not accessible_repos: + content = self.m_heading("Group Not Found", 1) + "\n\nThe requested group does not exist or you do not have access to any repositories in it.\n" + return self.render_template(content, st) + + content_parts = [] + + breadcrumb = f"{self.m_link('Groups', self.PATH_INDEX)} / {group_name}" + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + content_parts.append(self.m_heading(group_name, 1)) + content_parts.append("\n") + + if not accessible_repos: content_parts.append("No repositories available in this group.\n") + else: + content_parts.append(self.m_heading("Repositories", 2)) + content_parts.append("\n") + + for repo_name in sorted(accessible_repos.keys()): + repo = accessible_repos[repo_name] + + description = self.get_repository_description(repo["path"]) + + link = self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name) + content_parts.append(f" {link}\n") + if description: content_parts.append(f" {self.m_escape(description)}\n") + content_parts.append("\n") + + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_repo_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Repository page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + repo_name = data.get("var_r", "") if data else "" + repo = self.get_accessible_repository(remote_identity, group_name, repo_name) + + if not repo: + content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n" + return self.render_template(content, st) + + content_parts = [] + + # Breadcrumb navigation + breadcrumb = f"{self.m_link('Groups', self.PATH_INDEX)} / {self.m_link(group_name, self.PATH_GROUP, g=group_name)} / {repo_name}" + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + + content_parts.append(self.m_heading(repo_name, 1)) + content_parts.append("\n") + + # Repository metadata + description = self.get_repository_description(repo["path"]) + if description: content_parts.append(f"{self.m_escape(description)}\n\n") + + # Navigation links + content_parts.append(self.m_heading("Browse", 2)) + content_parts.append("\n") + content_parts.append(f" {self.m_link('๐Ÿ“ Files', self.PATH_TREE, g=group_name, r=repo_name, ref='HEAD')}\n") + content_parts.append(f" {self.m_link('๐Ÿ“ Commits', self.PATH_COMMITS, g=group_name, r=repo_name, ref='HEAD')}\n") + content_parts.append(f" {self.m_link('๐Ÿท๏ธ Refs', self.PATH_REFS, g=group_name, r=repo_name)}\n") + content_parts.append("\n") + + # Get refs information + refs = self.get_repository_refs(repo["path"]) + if refs: + content_parts.append(self.m_heading("Branches", 2)) + content_parts.append("\n") + + # Display first 10 branches + for ref in refs.get("heads", [])[:10]: content_parts.append(f" {ref['name']}: {ref['short_hash']}\n") + if len(refs.get("heads", [])) > 10: content_parts.append(f" ... and {len(refs['heads']) - 10} more\n") + content_parts.append("\n") + + if refs.get("tags"): + content_parts.append(self.m_heading("Tags", 2)) + content_parts.append("\n") + + # Display first 10 tags + for ref in refs["tags"][:10]: content_parts.append(f" {ref['name']}: {ref['short_hash']}\n") + if len(refs["tags"]) > 10: content_parts.append(f" ... and {len(refs['tags']) - 10} more\n") + content_parts.append("\n") + + # Readme content + readme_content, readme_is_markdown = self.get_readme_content(repo["path"]) + if readme_content is not None: + content_parts.append(self.m_heading("README", 2)) + content_parts.append("\n") + content_parts.append(self.m_divider()) + content_parts.append("\n") + + if readme_is_markdown: + converted = self.md_converter.format_block(readme_content) + content_parts.append(converted) + + else: content_parts.append(f"`=\n{readme_content}\n`=") + + content_parts.append("\n") + content_parts.append(self.m_divider()) + + else: + content_parts.append(self.m_italic("No README file found in this repository.")) + content_parts.append("\n") + + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_tree_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Tree page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + repo_name = data.get("var_r", "") if data else "" + ref = data.get("var_ref", "HEAD") if data else "HEAD" + tree_path = data.get("var_path", "") if data else "" + page_num = 0 + + try: + page_str = data.get("var_page", "0") if data else "0" + page_num = max(0, int(page_str)) + + except (ValueError, TypeError): page_num = 0 + + repo = self.get_accessible_repository(remote_identity, group_name, repo_name) + if not repo: + content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n" + return self.render_template(content, st) + + repo_path = repo["path"] + + # Validate ref exists + resolved_ref = self.resolve_ref(repo_path, ref) + if not resolved_ref: + content = self.m_heading("Ref Not Found", 1) + f"\n\nThe ref '{ref}' does not exist in this repository.\n" + content += f"\n" + self.m_link("View All Refs", self.PATH_REFS, g=group_name, r=repo_name) + "\n" + return self.render_template(content, st) + + content_parts = [] + + # Breadcrumb navigation + breadcrumb_parts = [ self.m_link("Groups", self.PATH_INDEX), + self.m_link(group_name, self.PATH_GROUP, g=group_name), + self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name) ] + + # Add path components to breadcrumb + if tree_path: + path_components = tree_path.strip("/").split("/") + current_path = "" + for i, component in enumerate(path_components): + current_path = current_path + "/" + component if current_path else component + if i == len(path_components) - 1: breadcrumb_parts.append(component) # Last component not a link + else: breadcrumb_parts.append(self.m_link(component, self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=current_path)) + + else: breadcrumb_parts.append("root") + + breadcrumb = " / ".join(breadcrumb_parts) + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + + content_parts.append(self.m_heading(f"Tree: {tree_path or 'root'}", 1)) + content_parts.append(f"`F666Ref: {ref} ({resolved_ref[:8]})`f\n\n") + + # Get tree entries + entries = self.get_tree_entries(repo_path, resolved_ref, tree_path) + + if entries is None: content_parts.append("Error reading directory contents.\n") + elif not entries: content_parts.append("Empty directory.\n") + else: + # Sort: Directories first, then files, both alphabetically + def sort_key(entry): + is_dir = entry["type"] in ("tree", "commit") # commit = submodule + return (not is_dir, entry["name"].lower()) + + entries.sort(key=sort_key) + + # Pagination + total_entries = len(entries) + start_idx = page_num * self.TREE_ENTRIES_PER_PAGE + end_idx = start_idx + self.TREE_ENTRIES_PER_PAGE + page_entries = entries[start_idx:end_idx] + + if total_entries > self.TREE_ENTRIES_PER_PAGE: + content_parts.append(f"`F666Showing {start_idx + 1}-{min(end_idx, total_entries)} of {total_entries} entries`f\n\n") + + content_parts.append(self.m_heading("Contents", 2)) + content_parts.append("\n") + + # Parent directory link (if not at root) + if tree_path: + parent_path = "/".join(tree_path.rstrip("/").split("/")[:-1]) + parent_link = self.m_link("../ (parent directory)", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=parent_path) + content_parts.append(f" {parent_link}\n") + + for entry in page_entries: + entry_name = entry["name"] + entry_type = entry["type"] + entry_mode = entry.get("mode", "") + + # Directory + if entry_type == "tree": + subpath = tree_path + "/" + entry_name if tree_path else entry_name + link = self.m_link(entry_name + "/", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=subpath) + content_parts.append(f" `F66d๐Ÿ“`f {link}\n") + + # Submodule + elif entry_type == "commit": + content_parts.append(f" `F66dโง‰`f {entry_name} `F666(submodule)`f\n") + + # Symlink + elif entry_type == "link": + target = entry.get("link_target", "unknown") + content_parts.append(f" `F66dโ†ณ`f {entry_name} `F666โ†’ {self.m_escape(target)}`f\n") + + # File (blob) + else: + size_str = self.format_size(entry.get("size", 0)) + subpath = tree_path + "/" + entry_name if tree_path else entry_name + link = self.m_link(entry_name, self.PATH_BLOB, g=group_name, r=repo_name, ref=ref, path=subpath) + content_parts.append(f" `F66d๐Ÿ“„`f {link} `F666({size_str})`f\n") + + content_parts.append("\n") + + # Pagination controls + if total_entries > self.TREE_ENTRIES_PER_PAGE: + content_parts.append(self.m_heading("Navigation", 2)) + content_parts.append("\n") + + nav_links = [] + if page_num > 0: + nav_links.append(self.m_link("ยซ Previous", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=tree_path, page=page_num - 1)) + + total_pages = (total_entries + self.TREE_ENTRIES_PER_PAGE - 1) // self.TREE_ENTRIES_PER_PAGE + nav_links.append(f"Page {page_num + 1} of {total_pages}") + + if end_idx < total_entries: + nav_links.append(self.m_link("Next ยป", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=tree_path, page=page_num + 1)) + + content_parts.append(" " + " | ".join(nav_links) + "\n") + + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_blob_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Blob page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + repo_name = data.get("var_r", "") if data else "" + ref = data.get("var_ref", "HEAD") if data else "HEAD" + file_path = data.get("var_path", "") if data else "" + + repo = self.get_accessible_repository(remote_identity, group_name, repo_name) + if not repo: + content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n" + return self.render_template(content, st) + + repo_path = repo["path"] + + # Validate ref exists + resolved_ref = self.resolve_ref(repo_path, ref) + if not resolved_ref: + content = self.m_heading("Ref Not Found", 1) + f"\n\nThe ref '{ref}' does not exist in this repository.\n" + return self.render_template(content, st) + + # Validate file path + if not file_path: + content = self.m_heading("Invalid Path", 1) + "\n\nNo file path specified.\n" + return self.render_template(content, st) + + content_parts = [] + + # Breadcrumb navigation + breadcrumb_parts = [ self.m_link("Groups", self.PATH_INDEX), + self.m_link(group_name, self.PATH_GROUP, g=group_name), + self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name) ] + + # Add path components + path_components = file_path.strip("/").split("/") + current_path = "" + for i, component in enumerate(path_components): + current_path = current_path + "/" + component if current_path else component + if i == len(path_components) - 1: breadcrumb_parts.append(component) # Last component (file) not a link + else: breadcrumb_parts.append(self.m_link(component, self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=current_path)) + + breadcrumb = " / ".join(breadcrumb_parts) + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + + content_parts.append(self.m_heading(file_path, 1)) + content_parts.append(f"`F666Ref: {ref} ({resolved_ref[:8]})`f\n\n") + + # Get blob info + blob_info = self.get_blob_info(repo_path, resolved_ref, file_path) + + if blob_info is None: content_parts.append("File not found at this ref.\n") + else: + size = blob_info.get("size", 0) + is_binary = blob_info.get("is_binary", False) + is_symlink = blob_info.get("is_symlink", False) + symlink_target = blob_info.get("symlink_target") + + # File metadata + content_parts.append(self.m_heading("File Info", 2)) + content_parts.append("\n") + content_parts.append(f" Size: {self.format_size(size)} ({size} bytes)\n") + content_parts.append(f" Type: {'Binary' if is_binary else 'Text'}\n") + if is_symlink: content_parts.append(f" Symlink: โ†’ {self.m_escape(symlink_target or 'unknown')}\n") + content_parts.append("\n") + + # Content display + if is_symlink: + content_parts.append(self.m_heading("Symlink Target", 2)) + content_parts.append("\n") + content_parts.append(f" {self.m_escape(symlink_target or 'unknown')}\n") + + elif is_binary: + content_parts.append(self.m_heading("Binary File", 2)) + content_parts.append("\n") + content_parts.append("This file appears to be binary and cannot be displayed as text.\n") + # TODO: Implement raw file downloads + + elif size > self.BLOB_SIZE_LIMIT: + content_parts.append(self.m_heading("File Too Large", 2)) + content_parts.append("\n") + content_parts.append(f"This file is {self.format_size(size)}, which exceeds the display limit of {self.format_size(self.BLOB_SIZE_LIMIT)}.\n") + + else: + # Display file content + content = self.get_blob_content(repo_path, resolved_ref, file_path) + if content is not None: + content_parts.append(self.m_heading("Content", 2)) + content_parts.append("\n") + content_parts.append(self.m_divider()) + content_parts.append("\n") + content_parts.append(f"`=\n{content}\n`=") + content_parts.append("\n") + content_parts.append(self.m_divider()) + else: + content_parts.append("Error reading file content.\n") + + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_commits_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Commits page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + repo_name = data.get("var_r", "") if data else "" + ref = data.get("var_ref", "HEAD") if data else "HEAD" + file_path = data.get("var_path", "") if data else "" + page_num = 0 + + try: + page_str = data.get("var_page", "0") if data else "0" + page_num = max(0, int(page_str)) + + except (ValueError, TypeError): page_num = 0 + + repo = self.get_accessible_repository(remote_identity, group_name, repo_name) + if not repo: + content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n" + return self.render_template(content, st) + + repo_path = repo["path"] + + # Validate ref exists + resolved_ref = self.resolve_ref(repo_path, ref) + if not resolved_ref: + content = self.m_heading("Ref Not Found", 1) + f"\n\nThe ref '{ref}' does not exist in this repository.\n" + return self.render_template(content, st) + + content_parts = [] + + # Breadcrumb navigation + breadcrumb_parts = [ self.m_link("Groups", self.PATH_INDEX), + self.m_link(group_name, self.PATH_GROUP, g=group_name), + self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name), + "commits" ] + + if file_path: breadcrumb_parts.insert(3, f"{self.m_escape(file_path)}") + + breadcrumb = " / ".join(breadcrumb_parts) + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + + title_suffix = f" ({file_path})" if file_path else "" + content_parts.append(self.m_heading(f"Commit History{title_suffix}", 1)) + content_parts.append(f"`F666Ref: {ref} ({resolved_ref[:8]})`f\n\n") + + # Get commits + skip = page_num * self.COMMITS_PER_PAGE + commits = self.get_commits(repo_path, resolved_ref, file_path, skip, self.COMMITS_PER_PAGE) + + if commits is None: content_parts.append("Error reading commit history.\n") + elif not commits: content_parts.append("No commits found.\n") + else: + content_parts.append(self.m_heading("Commits", 2)) + content_parts.append("\n") + + for commit in commits: + short_hash = commit["hash"][:7] + subject = commit["subject"] + author = commit["author"] + date = self.format_relative_time(commit["timestamp"]) + + hash_link = self.m_link(short_hash, self.PATH_COMMIT, g=group_name, r=repo_name, h=commit["hash"]) + + content_parts.append(f" `F66d{hash_link}`f {self.m_escape(subject)}\n") + content_parts.append(f" {self.m_escape(author)} โ€” {date}\n\n") + + # Pagination controls + has_more = len(commits) == self.COMMITS_PER_PAGE + + if page_num > 0 or has_more: + content_parts.append(self.m_heading("Navigation", 2)) + content_parts.append("\n") + + nav_links = [] + if page_num > 0: nav_links.append(self.m_link("ยซ Newer", self.PATH_COMMITS, g=group_name, r=repo_name, ref=ref, path=file_path, page=page_num - 1)) + nav_links.append(f"Page {page_num + 1}") + if has_more: nav_links.append(self.m_link("Older ยป", self.PATH_COMMITS, g=group_name, r=repo_name, ref=ref, path=file_path, page=page_num + 1)) + content_parts.append(" " + " | ".join(nav_links) + "\n") + + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_commit_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Commit page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + repo_name = data.get("var_r", "") if data else "" + commit_hash = data.get("var_h", "") if data else "" + + repo = self.get_accessible_repository(remote_identity, group_name, repo_name) + if not repo: + content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n" + return self.render_template(content, st) + + repo_path = repo["path"] + + # Validate commit hash + if not commit_hash or len(commit_hash) < 7: + content = self.m_heading("Invalid Commit", 1) + "\n\nNo valid commit hash specified.\n" + return self.render_template(content, st) + + # Resolve and validate the commit hash + resolved_hash = self.resolve_ref(repo_path, commit_hash) + if not resolved_hash: + content = self.m_heading("Commit Not Found", 1) + f"\n\nThe commit '{commit_hash}' does not exist in this repository.\n" + return self.render_template(content, st) + + # Verify it's actually a commit object + try: + type_result = subprocess.run(["git", "cat-file", "-t", resolved_hash], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if type_result.returncode != 0 or type_result.stdout.strip() != "commit": + content = self.m_heading("Invalid Object", 1) + f"\n\nThe hash '{commit_hash}' does not refer to a commit.\n" + return self.render_template(content, st) + + except Exception: + content = self.m_heading("Error", 1) + "\n\nCould not verify commit object.\n" + return self.render_template(content, st) + + content_parts = [] + + # Breadcrumb navigation + breadcrumb = f"{self.m_link('Groups', self.PATH_INDEX)} / {self.m_link(group_name, self.PATH_GROUP, g=group_name)} / {self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name)} / {resolved_hash[:7]}" + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + + commit_info = self.get_commit_info(repo_path, resolved_hash) + if not commit_info: + content_parts.append(self.m_heading("Error", 1) + "\n\nCould not retrieve commit information.\n") + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + content_parts.append(self.m_heading(f"Commit {resolved_hash[:7]}", 1)) + content_parts.append("\n") + + # Commit metadata + content_parts.append(self.m_heading("Details", 2)) + content_parts.append("\n") + content_parts.append(f" Hash: {resolved_hash}\n") + + if commit_info.get("parents"): + parent_links = [] + for parent_hash in commit_info["parents"]: + parent_link = self.m_link(parent_hash[:7], self.PATH_COMMIT, g=group_name, r=repo_name, h=parent_hash) + parent_links.append(parent_link) + + content_parts.append(f" Parents: {' '.join(parent_links)}\n") + + content_parts.append(f" Author: {self.m_escape(commit_info['author_name'])} <{self.m_escape(commit_info['author_email'])}>\n") + content_parts.append(f" Date: {commit_info['author_date']}\n") + + if commit_info.get("committer_name") != commit_info.get("author_name"): + content_parts.append(f" Commit: {self.m_escape(commit_info['committer_name'])} <{self.m_escape(commit_info['committer_email'])}>\n") + content_parts.append(f" Date: {commit_info['committer_date']}\n") + + content_parts.append("\n") + + # Commit message + if commit_info.get("message"): + content_parts.append(self.m_heading("Message", 2)) + content_parts.append("\n") + content_parts.append(self.m_escape(commit_info["message"]) + "\n") + content_parts.append("\n") + + # Changed files + if commit_info.get("files"): + content_parts.append(self.m_heading("Changes", 2)) + content_parts.append("\n") + + total_additions = sum(f.get("additions", 0) for f in commit_info["files"]) + total_deletions = sum(f.get("deletions", 0) for f in commit_info["files"]) + + content_parts.append(f" {len(commit_info['files'])} files changed, {total_additions} insertions(+), {total_deletions} deletions(-)\n\n") + + for file_info in commit_info["files"]: + status = file_info.get("status", "M") + file_path = file_info.get("path", "unknown") + additions = file_info.get("additions", 0) + deletions = file_info.get("deletions", 0) + + status_indicators = {"A": "`F0a0A`f", # Added - green + "D": "`F900D`f", # Deleted - red + "M": "`Faa0M`f", # Modified - yellow + "R": "`F0aaR`f" } # Renamed - cyan + + status_display = status_indicators.get(status, status) + + # File path as link to blob at this commit + file_link = self.m_link(self.m_escape(file_path), self.PATH_BLOB, g=group_name, r=repo_name, ref=resolved_hash, path=file_path) + + stats = [] + if additions > 0: stats.append(f"`F0a0+{additions}`f") + if deletions > 0: stats.append(f"`F900-{deletions}`f") + + stats_str = " ".join(stats) if stats else "" + content_parts.append(f" {status_display} {file_link} {stats_str}\n") + + content_parts.append("\n") + + if self.SHOW_DIFF_BY_DEFAULT and commit_info.get("diff"): + content_parts.append(self.m_heading("Diff", 2)) + content_parts.append("\n") + content_parts.append(self.m_divider()) + content_parts.append("\n") + content_parts.append("`=\n"+commit_info["diff"]+"\n`=") + content_parts.append("\n") + content_parts.append(self.m_divider()) + + # Navigation to tree at this commit + content_parts.append("\n") + content_parts.append(self.m_heading("Browse", 2)) + content_parts.append("\n") + content_parts.append(f" {self.m_link('๐Ÿ“ Browse files at this commit', self.PATH_TREE, g=group_name, r=repo_name, ref=resolved_hash)}\n") + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + def serve_refs_page(self, path, data, request_id, link_id, remote_identity, requested_at): + st = time.time() + RNS.log(f"Refs page request from {remote_identity}", RNS.LOG_DEBUG) + + group_name = data.get("var_g", "") if data else "" + repo_name = data.get("var_r", "") if data else "" + ref_type = data.get("var_type", "") if data else "" # "heads", "tags", or empty for both + + repo = self.get_accessible_repository(remote_identity, group_name, repo_name) + if not repo: + content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n" + return self.render_template(content, st) + + repo_path = repo["path"] + + content_parts = [] + + # Breadcrumb navigation + breadcrumb = f"{self.m_link('Groups', self.PATH_INDEX)} / {self.m_link(group_name, self.PATH_GROUP, g=group_name)} / {self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name)} / refs" + content_parts.append(self.m_align(breadcrumb, "left") + "\n\n") + + content_parts.append(self.m_heading("Refs", 1)) + content_parts.append("\n") + + # Get default branch (HEAD) + default_branch = None + try: + head_result = subprocess.run(["git", "symbolic-ref", "HEAD"], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if head_result.returncode == 0: default_branch = head_result.stdout.strip().replace("refs/heads/", "") + + except Exception: pass + + show_heads = not ref_type or ref_type == "heads" + show_tags = not ref_type or ref_type == "tags" + + refs_data = self.get_refs_info(repo_path, default_branch) + + if show_heads and refs_data.get("heads"): + content_parts.append(self.m_heading(f"Branches ({len(refs_data['heads'])})", 2)) + content_parts.append("\n") + + for ref_info in refs_data["heads"]: + branch_name = ref_info["name"] + short_hash = ref_info["short_hash"] + is_default = ref_info.get("is_default", False) + commit_subject = ref_info.get("commit_subject", "") + + # Branch name with default indicator + name_display = f"`F0a0{branch_name}`f" if is_default else branch_name + default_marker = " `F0a0(default)`f" if is_default else "" + + # Links to tree and commits at this branch + tree_link = self.m_link("tree", self.PATH_TREE, g=group_name, r=repo_name, ref=branch_name) + commits_link = self.m_link("commits", self.PATH_COMMITS, g=group_name, r=repo_name, ref=branch_name) + + content_parts.append(f" {name_display}{default_marker}\n") + content_parts.append(f" {short_hash}: {self.m_escape(commit_subject)}\n") + content_parts.append(f" [{tree_link}] [{commits_link}]\n\n") + + if show_tags and refs_data.get("tags"): + content_parts.append(self.m_heading(f"Tags ({len(refs_data['tags'])})", 2)) + content_parts.append("\n") + + for ref_info in refs_data["tags"]: + tag_name = ref_info["name"] + short_hash = ref_info["short_hash"] + is_annotated = ref_info.get("is_annotated", False) + tag_message = ref_info.get("tag_message", "") + commit_subject = ref_info.get("commit_subject", "") + + # Tag name with annotated indicator + annotated_marker = " `Faa0(annotated)`f" if is_annotated else "" + + # Links to tree and commits at this tag + tree_link = self.m_link("tree", self.PATH_TREE, g=group_name, r=repo_name, ref=tag_name) + commits_link = self.m_link("commits", self.PATH_COMMITS, g=group_name, r=repo_name, ref=tag_name) + + content_parts.append(f" {tag_name}{annotated_marker}\n") + if is_annotated and tag_message: content_parts.append(f" Tag: {self.m_escape(tag_message[:60])}\n") + content_parts.append(f" {short_hash}: {self.m_escape(commit_subject)}\n") + content_parts.append(f" [{tree_link}] [{commits_link}]\n\n") + + # No refs found + if (show_heads and not refs_data.get("heads")) and (show_tags and not refs_data.get("tags")): + content_parts.append("No refs found in this repository.\n") + + content_parts.append("\n") + + # Filtering links + content_parts.append(self.m_heading("Filter", 2)) + content_parts.append("\n") + filter_links = [] + filter_links.append(self.m_link("All", self.PATH_REFS, g=group_name, r=repo_name)) + filter_links.append(self.m_link("Branches only", self.PATH_REFS, g=group_name, r=repo_name, type="heads")) + filter_links.append(self.m_link("Tags only", self.PATH_REFS, g=group_name, r=repo_name, type="tags")) + content_parts.append(" " + " | ".join(filter_links) + "\n") + + content_parts.append("\n") + + page_content = "".join(content_parts) + return self.render_template(page_content, st) + + ####################### + # Git Data Extraction # + ####################### + + def get_repository_description(self, repo_path): + try: + # Try git config first + result = subprocess.run(["git", "config", "--get", "repository.description"], + cwd=repo_path, capture_output=True, text=True, check=False) + + if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip() + + # Fall back to description file + desc_path = os.path.join(repo_path, "description") + if os.path.isfile(desc_path): + with open(desc_path, "r") as f: + desc = f.read().strip() + # Skip default template descriptions + if desc and not desc.startswith("Unnamed repository"): return desc + + except Exception as e: RNS.log(f"Error getting repository description: {e}", RNS.LOG_DEBUG) + + return None + + def get_repository_refs(self, repo_path): + refs = {"heads": [], "tags": []} + + try: + # Get all refs with their hashes + result = subprocess.run(["git", "for-each-ref", "--format", "%(objectname) %(refname) %(refname:short)", "refs/heads", "refs/tags"], + cwd=repo_path, capture_output=True, text=True, check=False) + + if result.returncode != 0: return refs + + for line in result.stdout.strip().split("\n"): + if not line.strip(): continue + parts = line.split(" ", 2) + if len(parts) >= 2: + full_hash = parts[0] + ref_name = parts[1] + short_name = parts[2] if len(parts) > 2 else ref_name + short_hash = full_hash[:7] + + ref_info = { "name": short_name, + "hash": full_hash, + "short_hash": short_hash } + + if ref_name.startswith("refs/heads/"): refs["heads"].append(ref_info) + elif ref_name.startswith("refs/tags/"): refs["tags"].append(ref_info) + + except Exception as e: RNS.log(f"Error getting repository refs: {e}", RNS.LOG_DEBUG) + + return refs + + def resolve_ref(self, repo_path, ref): + try: + result = subprocess.run(["git", "rev-parse", "--verify", ref], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode == 0: + hash_val = result.stdout.strip() + # Validate it's a 40-char hex string + if len(hash_val) == 40 and all(c in "0123456789abcdef" for c in hash_val.lower()): return hash_val.lower() + + except subprocess.TimeoutExpired: RNS.log(f"Timeout resolving ref '{ref}'", RNS.LOG_WARNING) + except Exception as e: RNS.log(f"Error resolving ref: {e}", RNS.LOG_WARNING) + return None + + def get_tree_entries(self, repo_path, ref, path): + entries = [] + tree_path = path.strip("/") if path else "" + + try: + # Use git ls-tree to list directory contents + ls_tree_path = f"{ref}:{tree_path}" if tree_path else ref + result = subprocess.run(["git", "ls-tree", "-l", ls_tree_path], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode != 0: + # Check if it's actually a tree + cat_result = subprocess.run(["git", "cat-file", "-t", ls_tree_path], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if cat_result.returncode != 0 or cat_result.stdout.strip() != "tree": return None # Not a valid tree + return [] # Valid but empty tree + + for line in result.stdout.strip().split("\n"): + if not line.strip(): continue + + # Parse ls-tree output: \t + # Format: 100644 blob abc123 1234\tfilename + parts = line.split("\t", 1) + if len(parts) != 2: continue + + meta_part = parts[0] + name = parts[1] + + meta_parts = meta_part.split() + if len(meta_parts) < 3: continue + + mode = meta_parts[0] + obj_type = meta_parts[1] + # sha = meta_parts[2] + size = 0 + if len(meta_parts) >= 4: + try: size = int(meta_parts[3]) + except ValueError: size = 0 + + entry = { "name": name, + "type": obj_type, # blob, tree, or commit + "mode": mode, + "size": size, + "link_target": None } + + # Detect symlinks (mode 120000) and get symlink target + if mode == "120000": + entry["type"] = "link" + try: + symlink_result = subprocess.run(["git", "show", f"{ref}:{tree_path}/{name}" if tree_path else f"{ref}:{name}"], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if symlink_result.returncode == 0: entry["link_target"] = symlink_result.stdout.strip() + + except Exception: pass + + entries.append(entry) + + except subprocess.TimeoutExpired: + RNS.log(f"Timeout listing tree contents", RNS.LOG_WARNING) + return None + + except Exception as e: + RNS.log(f"Error getting tree entries: {e}", RNS.LOG_WARNING) + return None + + return entries + + def get_blob_info(self, repo_path, ref, path): + file_path = path.strip("/") + + try: + # Get object info + result = subprocess.run(["git", "cat-file", "-s", f"{ref}:{file_path}"], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode != 0: return None + + size = int(result.stdout.strip()) + + # Check if it's a symlink via ls-tree + parent_dir = "/".join(file_path.split("/")[:-1]) + filename = file_path.split("/")[-1] + + ls_tree_path = f"{ref}:{parent_dir}" if parent_dir else ref + result = subprocess.run(["git", "ls-tree", ls_tree_path], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + is_symlink = False + if result.returncode == 0: + for line in result.stdout.strip().split("\n"): + if f"\t{filename}" in line and line.startswith("120000"): + is_symlink = True + break + + # Get symlink target if applicable + symlink_target = None + if is_symlink: + content_result = subprocess.run(["git", "show", f"{ref}:{file_path}"], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if content_result.returncode == 0: symlink_target = content_result.stdout.strip() + + # Binary detection using git diff --numstat + is_binary = False + if not is_symlink: + # Try to get a sample of the file to check for null bytes + sample_result = subprocess.run(["git", "show", f"{ref}:{file_path}"], + cwd=repo_path, capture_output=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if sample_result.returncode == 0: + # Check for null bytes in first 8KB + sample = sample_result.stdout[:8192] + if b'\x00' in sample: is_binary = True + else: + # Also check using git diff approach for text encoding issues + diff_result = subprocess.run(["git", "diff", "--numstat", "--no-index", "--", "/dev/null", f"{ref}:{file_path}"], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + # If git says it's binary, the line will start with "-" "-" + if diff_result.returncode == 1: # git diff returns 1 for differences + first_line = diff_result.stdout.strip().split("\n")[0] if diff_result.stdout else "" + if first_line.startswith("-"): is_binary = True + + return { "size": size, + "is_binary": is_binary, + "is_symlink": is_symlink, + "symlink_target": symlink_target } + + except subprocess.TimeoutExpired: + RNS.log(f"Timeout getting blob info", RNS.LOG_DEBUG) + return None + + except Exception as e: + RNS.log(f"Error getting blob info: {e}", RNS.LOG_DEBUG) + return None + + def get_blob_content(self, repo_path, ref, path): + file_path = path.strip("/") + + try: + result = subprocess.run(["git", "show", f"{ref}:{file_path}"], + cwd=repo_path, capture_output=True, text=True, + timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode == 0: return result.stdout + + except subprocess.TimeoutExpired: RNS.log(f"Timeout getting blob content", RNS.LOG_WARNING) + except Exception as e: RNS.log(f"Error getting blob content: {e}", RNS.LOG_WARNING) + + return None + + def get_refs_info(self, repo_path, default_branch=None): + refs = {"heads": [], "tags": []} + + try: + # Get all refs with their hashes and commit info + # Format: objectname refname refname:short subject + result = subprocess.run(["git", "for-each-ref", "--format=%(objectname)|%(refname)|%(refname:short)|%(subject)", "refs/heads", "refs/tags"], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode != 0: return refs + + for line in result.stdout.strip().split("\n"): + if not line.strip(): continue + + parts = line.split("|", 3) + if len(parts) < 3: continue + + full_hash = parts[0] + ref_name = parts[1] + short_name = parts[2] + subject = parts[3] if len(parts) > 3 else "" + short_hash = full_hash[:7] + + ref_info = { "name": short_name, + "hash": full_hash, + "short_hash": short_hash, + "commit_subject": subject, + "is_default": short_name == default_branch, + "is_annotated": False, + "tag_message": None } + + if ref_name.startswith("refs/heads/"): refs["heads"].append(ref_info) + + elif ref_name.startswith("refs/tags/"): + # Check if it's an annotated tag + try: + # Get the object type this tag points to + tag_result = subprocess.run(["git", "for-each-ref", "--format=%(objecttype)|%(contents:subject)", ref_name], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if tag_result.returncode == 0: + tag_parts = tag_result.stdout.strip().split("|", 1) + if len(tag_parts) >= 2: + obj_type = tag_parts[0] + if obj_type == "tag": + ref_info["is_annotated"] = True + ref_info["tag_message"] = tag_parts[1] + + except Exception: pass + + refs["tags"].append(ref_info) + + except subprocess.TimeoutExpired: RNS.log(f"Timeout getting refs info", RNS.LOG_WARNING) + except Exception as e: RNS.log(f"Error getting refs info: {e}", RNS.LOG_WARNING) + + return refs + + def get_commits(self, repo_path, ref, file_path, skip, limit): + commits = [] + + try: + cmd = ["git", "log", "--format=%H|%s|%an|%ae|%at", "--skip", str(skip), "-n", str(limit), ref] + if file_path: cmd.extend(["--", file_path]) + + result = subprocess.run(cmd, cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode != 0: return None + + for line in result.stdout.strip().split("\n"): + if not line.strip(): continue + + parts = line.split("|", 4) + if len(parts) >= 5: + commits.append({ "hash": parts[0], + "subject": parts[1], + "author": parts[2], + "author_email": parts[3], + "timestamp": int(parts[4]) }) + + except subprocess.TimeoutExpired: + RNS.log(f"Timeout getting commits", RNS.LOG_DEBUG) + return None + + except Exception as e: + RNS.log(f"Error getting commits: {e}", RNS.LOG_DEBUG) + return None + + return commits + + def get_commit_info(self, repo_path, commit_hash): + try: + # Get commit metadata + format_str = "%P%n%an%n%ae%n%aI%n%cn%n%ce%n%cI%n%B" + result = subprocess.run(["git", "show", "--no-patch", "--format=" + format_str, commit_hash], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if result.returncode != 0: return None + + lines = result.stdout.split("\n") + if len(lines) < 7: return None + + # Parse parents (space-separated hashes on first line) + parents = lines[0].strip().split() if lines[0].strip() else [] + + info = { "parents": parents, + "author_name": lines[1], + "author_email": lines[2], + "author_date": lines[3], + "committer_name": lines[4], + "committer_email": lines[5], + "committer_date": lines[6], + "message": "\n".join(lines[7:]).strip(), + "files": [], + "diff": None } + + # Get file change statistics + stats_result = subprocess.run(["git", "diff-tree", "--numstat", "-r", commit_hash], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if stats_result.returncode == 0: + for line in stats_result.stdout.strip().split("\n"): + if not line.strip(): continue + + # Parse numstat output: \t\t + parts = line.split("\t") + if len(parts) >= 3: + additions = parts[0] + deletions = parts[1] + file_path = parts[2] + + # Detect status based on additions/deletions pattern + status = "M" # Modified by default + if additions == "-" and deletions == "-": + status = "R" # Renamed (binary or unmerged) + additions = "0" + deletions = "0" + + # Check for added/deleted via --diff-filter would require second call + # Instead, we'll use a simpler approach: check if file exists in parent + if info["parents"]: + parent = info["parents"][0] + # Check if file exists in parent + check_result = subprocess.run(["git", "cat-file", "-e", f"{parent}:{file_path}"], + cwd=repo_path, capture_output=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if check_result.returncode != 0: status = "A" # Added (didn't exist in parent) + else: + # Check if file exists in current commit + check_current = subprocess.run(["git", "cat-file", "-e", f"{commit_hash}:{file_path}"], + cwd=repo_path, capture_output=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if check_current.returncode != 0: status = "D" # Deleted (doesn't exist in current) + + try: + add_count = int(additions) if additions != "-" else 0 + del_count = int(deletions) if deletions != "-" else 0 + + except ValueError: + add_count = 0 + del_count = 0 + + info["files"].append({ "path": file_path, + "status": status, + "additions": add_count, + "deletions": del_count }) + + # Get diff if enabled + if self.SHOW_DIFF_BY_DEFAULT: + diff_result = subprocess.run(["git", "show", "--format=", commit_hash], + cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False) + + if diff_result.returncode == 0: info["diff"] = diff_result.stdout + + return info + + except subprocess.TimeoutExpired: + RNS.log(f"Timeout getting commit info", RNS.LOG_WARNING) + return None + + except Exception as e: + RNS.log(f"Error getting commit info: {e}", RNS.LOG_WARNING) + return None + + def get_readme_content(self, repo_path): + readme_names = [ ("README.md", True), ("README.rst", False), ("README.txt", False), ("README", False), + ("readme.md", True), ("readme.rst", False), ("readme.txt", False), ("readme", False) ] + + for readme_name, is_markdown in readme_names: + try: + result = subprocess.run(["git", "show", f"HEAD:{readme_name}"], + cwd=repo_path, capture_output=True, text=True, check=False) + + if result.returncode == 0: return result.stdout, is_markdown + + except Exception: continue + + return None, False + + ################### + # Utility Methods # + ################### + + def format_size(self, size_bytes): return RNS.prettysize(size_bytes) + + def format_relative_time(self, timestamp): + now = time.time() + diff = now - timestamp + + if diff < 60: + return "just now" + elif diff < 3600: + minutes = int(diff / 60) + return f"{minutes} minute{'s' if minutes != 1 else ''} ago" + elif diff < 86400: + hours = int(diff / 3600) + return f"{hours} hour{'s' if hours != 1 else ''} ago" + elif diff < 604800: + days = int(diff / 86400) + return f"{days} day{'s' if days != 1 else ''} ago" + elif diff < 2592000: + weeks = int(diff / 604800) + return f"{weeks} week{'s' if weeks != 1 else ''} ago" + elif diff < 31536000: + months = int(diff / 2592000) + return f"{months} month{'s' if months != 1 else ''} ago" + else: + years = int(diff / 31536000) + return f"{years} year{'s' if years != 1 else ''} ago" + + ####################### + # Connection Handlers # + ####################### def remote_connected(self, link): RNS.log(f"Peer connected to {self.destination}", RNS.LOG_DEBUG) @@ -93,4 +1340,45 @@ class NomadNetworkNode(): RNS.log(f"Peer disconnected from {self.destination}", RNS.LOG_DEBUG) def remote_identified(self, link, identity): - RNS.log(f"Peer identified as {link.get_remote_identity()} on {link}", RNS.LOG_DEBUG) \ No newline at end of file + RNS.log(f"Peer identified as {link.get_remote_identity()} on {link}", RNS.LOG_DEBUG) + + ###################### + # Permission Control # + ###################### + + def get_accessible_groups(self, remote_identity): + accessible_groups = {} + + for group_name, group_data in self.owner.groups.items(): + accessible_repos = self.get_accessible_repositories(remote_identity, group_name) + + if accessible_repos: + accessible_groups[group_name] = { "path": group_data["path"], "repositories": accessible_repos } + + return accessible_groups + + def get_accessible_repositories(self, remote_identity, group_name): + if group_name not in self.owner.groups: return {} + + group_data = self.owner.groups[group_name] + all_repos = group_data.get("repositories", {}) + accessible_repos = {} + + for repo_name, repo_data in all_repos.items(): + if self.resolve_permission(remote_identity, group_name, repo_name, self.owner.PERM_READ): + accessible_repos[repo_name] = repo_data + + return accessible_repos + + def get_accessible_repository(self, remote_identity, group_name, repo_name): + if group_name not in self.owner.groups: return None + + group_data = self.owner.groups[group_name] + all_repos = group_data.get("repositories", {}) + + if repo_name not in all_repos: return None + + if self.resolve_permission(remote_identity, group_name, repo_name, self.owner.PERM_READ): + return all_repos[repo_name] + + return None diff --git a/RNS/Utilities/rngit/server.py b/RNS/Utilities/rngit/server.py index c131dfe6..05a0cdab 100644 --- a/RNS/Utilities/rngit/server.py +++ b/RNS/Utilities/rngit/server.py @@ -323,7 +323,8 @@ class ReticulumGitNode(): elif self.TGT_ALL in repository_permissions: return True elif remote_hash in repository_permissions: return True else: - if self.TGT_NONE in group_permissions: return False + if len(repository_permissions) > 0: return False + elif self.TGT_NONE in group_permissions: return False elif self.TGT_ALL in group_permissions: return True elif remote_hash in group_permissions: return True else: return False diff --git a/RNS/Utilities/rngit/util.py b/RNS/Utilities/rngit/util.py new file mode 100644 index 00000000..59cfc7b9 --- /dev/null +++ b/RNS/Utilities/rngit/util.py @@ -0,0 +1,415 @@ +# Reticulum License +# +# Copyright (c) 2016-2026 Mark Qvist +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# - The Software shall not be used in any kind of system which includes amongst +# its functions the ability to purposefully do harm to human beings. +# +# - The Software shall not be used, directly or indirectly, in the creation of +# an artificial intelligence, machine learning or language model training +# dataset, including but not limited to any use that contributes to the +# training or development of such a model or algorithm. +# +# - The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import re +from typing import List + +class MarkdownToMicron: + BOLD = "`!" + BOLD_END = "`!" + ITALIC = "`*" + ITALIC_END = "`*" + UNDERLINE = "`_" + UNDERLINE_END = "`_" + + CODE_BG = "`B444" + CODE_FG = "`Fddd" + CODE_RESET = "`f`b" + + LITERAL_START = "`=" + LITERAL_END = "`=" + + BULLET = "โ€ข" + + # Regex patterns for markdown elements + HEADER_RE = re.compile(r'^(#{1,6})\s+(.+)$') + CODE_FENCE_RE = re.compile(r'^(\s*)```(.*)$') + HORIZONTAL_RULE_RE = re.compile(r'^(\s*)(---+|===+|\*\*\*+|___+)\s*$') + UNORDERED_LIST_RE = re.compile(r'^(\s*)([-*+])\s+(.+)$') + + # Table patterns + TABLE_ROW_RE = re.compile(r'^\s*\|?(.+?)\|?\s*$') + TABLE_SEP_RE = re.compile(r'^\s*\|?(?:\s*:?-+:?\s*\|)+\s*$') + + # Inline patterns (processed in order of specificity) + LINK_RE = re.compile(r'\[([^\]]+)\]\(([^)]+)\)') + INLINE_CODE_RE = re.compile(r'`([^`]+)`') + BOLD_RE = re.compile(r'\*\*(.+?)\*\*|__(.+?)__') + ITALIC_RE = re.compile(r'\*(.+?)\*|_(.+?)_') + + TABLE_H = "โ”€" + TABLE_V = "โ”‚" + TABLE_TL = "โ”Œ" + TABLE_TR = "โ”" + TABLE_BL = "โ””" + TABLE_BR = "โ”˜" + TABLE_ML = "โ”œ" + TABLE_MR = "โ”ค" + TABLE_TM = "โ”ฌ" + TABLE_BM = "โ”ด" + TABLE_MM = "โ”ผ" + + TABLE_MIN_COL_WIDTH = 3 + + def __init__(self, max_width=100): + self.max_width = max_width + + def format_block(self, text: str) -> str: + lines = text.split('\n') + result_lines = [] + in_code_block = False + in_table = False + table_buffer = [] + + def flush_table_buffer(): + nonlocal result_lines, table_buffer, in_table + if not table_buffer: + in_table = False + return + + if len(table_buffer) >= 2 and self._is_table_separator(table_buffer[1]): + formatted_lines = self.format_table(table_buffer) + result_lines.extend(formatted_lines) + + else: + for line in table_buffer: result_lines.append(self.format_line(line)) + + table_buffer = [] + in_table = False + + for line in lines: + is_fence, _ = self._detect_code_fence(line) + + if is_fence: + # Flush any pending table before code fence + flush_table_buffer() + + if not in_code_block: + in_code_block = True + result_lines.append(self.LITERAL_START) + + else: + result_lines.append(self.LITERAL_END) + in_code_block = False + + else: + if in_code_block: result_lines.append(self._escape_literals(line)) + else: + if self._is_table_row(line): + if not in_table: + in_table = True + table_buffer = [line] + + else: table_buffer.append(line) + + else: + # Line breaks table - flush buffer + if in_table: flush_table_buffer() + formatted = self.format_line(line) + result_lines.append(formatted) + + # Handle unclosed structures + if in_table: flush_table_buffer() + if in_code_block: result_lines.append(self.LITERAL_END) + + return '\n'.join(result_lines) + + def format_line(self, line: str, mode: str = "normal") -> str: + if mode == "codeblock": return self._escape_literals(line) + + if self.HORIZONTAL_RULE_RE.match(line): return self._format_horizontal_rule() + + header_match = self.HEADER_RE.match(line) + if header_match: return self._format_header(header_match) + + list_match = self.UNORDERED_LIST_RE.match(line) + if list_match: return self._format_list_item(list_match) + + line = self._format_inline(line) + + return line + + def _format_inline(self, text: str) -> str: + code_blocks = [] + def extract_code(match): + code_blocks.append(match.group(1)) + return f"\x00{len(code_blocks)-1}\x00" + + text = self.INLINE_CODE_RE.sub(extract_code, text) + text = self.BOLD_RE.sub(self._bold_sub, text) + text = self.ITALIC_RE.sub(self._italic_sub, text) + text = self.LINK_RE.sub(self._link_sub, text) + + def restore_code(match): + idx = int(match.group(1)) + content = code_blocks[idx] + # Escape any backticks in the content + content = content.replace('`', '\\`') + return f"{self.CODE_BG}{self.CODE_FG}{content}{self.CODE_RESET}" + + text = re.sub(r'\x00(\d+)\x00', restore_code, text) + return text + + def _bold_sub(self, match: re.Match) -> str: + content = match.group(1) or match.group(2) + return f"{self.BOLD}{content}{self.BOLD_END}" + + def _italic_sub(self, match: re.Match) -> str: + content = match.group(1) or match.group(2) + return f"{self.ITALIC}{content}{self.ITALIC_END}" + + def _link_sub(self, match: re.Match) -> str: + text = match.group(1) + url = match.group(2) + # TODO: Evaluate best way to handle both normal and nomadnet URLs + text = text.replace('`', '') + # url = url.replace('`', '\\`') + return f"`[{text}`{url}]" + + def _format_header(self, match: re.Match) -> str: + hashes = match.group(1) + content = match.group(2) + level = len(hashes) + prefix = ">" * min(level, 6) + return f"{prefix}{content}" + + def _format_list_item(self, match: re.Match) -> str: + indent = match.group(1) + content = match.group(3) + content = self._format_inline(content) + return f"{indent} {self.BULLET} {content}" + + def _format_horizontal_rule(self) -> str: + return "-" + + def _detect_code_fence(self, line: str) -> tuple[bool, str]: + match = self.CODE_FENCE_RE.match(line) + if match: return True, match.group(1) + return False, "" + + def _is_table_row(self, line: str) -> bool: + if '|' not in line: return False + match = self.TABLE_ROW_RE.match(line) + if match is None: return False + content = match.group(1) + return '|' in content or line.strip().startswith('|') + + def _is_table_separator(self, line: str) -> bool: + if '|' not in line: return False + match = self.TABLE_SEP_RE.match(line) + return match is not None + + def _escape_literals(self, text: str) -> str: + return text.replace('`', '\\`') + + def format_table(self, rows: List[str]) -> List[str]: + if len(rows) < 2: return rows + + # Parse header and separator + header_cells = self._parse_table_row(rows[0]) + alignments = self._parse_table_alignments(rows[1]) + + # Ensure alignment count matches header cells + while len(alignments) < len(header_cells): alignments.append('left') + alignments = alignments[:len(header_cells)] + + # Parse data rows + data_rows = [] + for i in range(2, len(rows)): + cells = self._parse_table_row(rows[i]) + while len(cells) < len(header_cells): cells.append("") + cells = cells[:len(header_cells)] + data_rows.append(cells) + + # Calculate column widths based on content + num_cols = len(header_cells) + col_widths = [0] * num_cols + + all_rows = [header_cells] + data_rows + for row in all_rows: + for i, cell in enumerate(row): + formatted = self._format_inline(cell) + # Calculate visible width (excluding Micron tags) + width = self._visible_width(formatted) + col_widths[i] = max(col_widths[i], width) + + # Apply minimum width and calculate total + col_widths = [max(w, self.TABLE_MIN_COL_WIDTH) for w in col_widths] + + # Check max_width constraint + # Total = sum of columns + 3 chars per column (space + 2 borders) + 1 for final border + total_width = sum(col_widths) + (num_cols * 3) + 1 + + if total_width > self.max_width: + # Reduce widest columns proportionally + excess = total_width - self.max_width + indexed_widths = [(i, w) for i, w in enumerate(col_widths)] + indexed_widths.sort(key=lambda x: -x[1]) + + for i, w in indexed_widths: + if excess <= 0: break + reduction = min(excess, w - self.TABLE_MIN_COL_WIDTH) + col_widths[i] -= reduction + excess -= reduction + + # Build formatted table + result = [] + + # Center alignment start + result.append("`c") + + # Top border + border = self.TABLE_TL + for i, w in enumerate(col_widths): + border += self.TABLE_H * (w + 2) + if i < len(col_widths) - 1: border += self.TABLE_TM + else: border += self.TABLE_TR + + result.append(self._escape_literals(border)) + + # Header row + header_line = self.TABLE_V + for i, cell in enumerate(header_cells): + formatted = self._format_inline(cell) + padded = self._pad_cell(formatted, col_widths[i], 'left') + header_line += f" {padded} {self.TABLE_V}" + result.append(self._escape_literals(header_line)) + + # Separator row + sep_line = self.TABLE_ML + for i, w in enumerate(col_widths): + cell_width = w + 2 + if alignments[i] == 'center': sep_line += f":{self.TABLE_H * (cell_width - 2)}:" + elif alignments[i] == 'right': sep_line += f"{self.TABLE_H * (cell_width - 1)}:" + else: sep_line += self.TABLE_H * cell_width + + if i < len(col_widths) - 1: sep_line += self.TABLE_MM + else: sep_line += self.TABLE_MR + + result.append(self._escape_literals(sep_line)) + + # Data rows + for row in data_rows: + row_line = self.TABLE_V + for i, cell in enumerate(row): + formatted = self._format_inline(cell) + padded = self._pad_cell(formatted, col_widths[i], alignments[i]) + row_line += f" {padded} {self.TABLE_V}" + + result.append(row_line) + + # Bottom border + border = self.TABLE_BL + for i, w in enumerate(col_widths): + border += self.TABLE_H * (w + 2) + if i < len(col_widths) - 1: border += self.TABLE_BM + else: border += self.TABLE_BR + + result.append(self._escape_literals(border)) + + # End center alignment + result.append("`a") + + return result + + def _parse_table_row(self, line: str) -> List[str]: + line = line.strip() + if line.startswith('|'): line = line[1:] + if line.endswith('|'): line = line[:-1] + + cells = [] + current = "" + escaped = False + for char in line: + if escaped: + current += char + escaped = False + elif char == '\\': + escaped = True + elif char == '|': + cells.append(current.strip()) + current = "" + else: + current += char + + cells.append(current.strip()) + return cells + + def _parse_table_alignments(self, line: str) -> List[str]: + cells = self._parse_table_row(line) + alignments = [] + for cell in cells: + cell = cell.strip() + if cell.startswith(':') and cell.endswith(':'): alignments.append('center') + elif cell.endswith(':'): alignments.append('right') + else: alignments.append('left') + + return alignments + + def _visible_width(self, text: str) -> int: + # Remove Micron tags + text = re.sub(r'`[FB][0-9a-fA-F]{3}', '', text) + text = re.sub(r'`[!*_]', '', text) + text = re.sub(r'`f`b', '', text) + text = re.sub(r'`f', '', text) + text = re.sub(r'`b', '', text) + return len(text) + + def _pad_cell(self, text: str, width: int, align: str) -> str: + text = self._truncate_cell(text, width) + text_width = self._visible_width(text) + padding = width - text_width + + if align == 'right': + return " " * padding + text + elif align == 'center': + left = padding // 2 + right = padding - left + return " " * left + text + " " * right + else: + return text + " " * padding + + def _truncate_cell(self, text: str, width: int) -> str: + if self._visible_width(text) <= width: return text + + stripped = text + stripped = re.sub(r'`[FB][0-9a-fA-F]{3}', '', stripped) + stripped = re.sub(r'`[!*_]', '', stripped) + stripped = re.sub(r'`f`b', '', stripped) + + if len(stripped) <= width - 1: return text + + truncated = stripped[:width - 1] + "โ€ฆ" + return truncated + + +def convert_markdown_to_micron(text: str) -> str: + converter = MarkdownToMicron() + return converter.format_block(text)