mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-06-22 03:58:51 -07:00
1415 lines
65 KiB
Python
1415 lines
65 KiB
Python
# Reticulum License
|
|
#
|
|
# Copyright (c) 2016-2026 Mark Qvist
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# - The Software shall not be used in any kind of system which includes amongst
|
|
# its functions the ability to purposefully do harm to human beings.
|
|
#
|
|
# - The Software shall not be used, directly or indirectly, in the creation of
|
|
# an artificial intelligence, machine learning or language model training
|
|
# dataset, including but not limited to any use that contributes to the
|
|
# training or development of such a model or algorithm.
|
|
#
|
|
# - The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
import os
|
|
import time
|
|
import subprocess
|
|
import urllib.parse
|
|
import RNS
|
|
from RNS.Utilities.rngit import APP_NAME
|
|
from RNS.Utilities.rngit.util import MarkdownToMicron
|
|
from RNS.vendor.configobj import ConfigObj
|
|
from RNS._version import __version__
|
|
|
|
class NomadNetworkNode():
|
|
APP_NAME = "nomadnetwork"
|
|
JOBS_INTERVAL = 5
|
|
|
|
PATH_INDEX = "/page/index.mu"
|
|
PATH_GROUP = "/page/group.mu"
|
|
PATH_REPO = "/page/repo.mu"
|
|
PATH_TREE = "/page/tree.mu"
|
|
PATH_BLOB = "/page/blob.mu"
|
|
PATH_COMMITS = "/page/commits.mu"
|
|
PATH_COMMIT = "/page/commit.mu"
|
|
PATH_REFS = "/page/refs.mu"
|
|
|
|
BLOB_SIZE_LIMIT = 256 * 1024
|
|
TREE_ENTRIES_PER_PAGE = 100
|
|
COMMITS_PER_PAGE = 20
|
|
SHOW_DIFF_BY_DEFAULT = True
|
|
GIT_COMMAND_TIMEOUT = 5
|
|
MAX_RENDER_WIDTH = 100
|
|
|
|
def __init__(self, owner=None):
|
|
if not owner: raise TypeError(f"Invalid owner {owner} for {self}")
|
|
|
|
self._ready = False
|
|
self._should_run = False
|
|
self.owner = owner
|
|
self.identity = owner.identity
|
|
self.node_name = owner.node_name
|
|
self.announce_interval = owner.announce_interval
|
|
self.last_announce = 0
|
|
self.null_ident = RNS.Identity.from_bytes(bytes(64))
|
|
self.mdc = MarkdownToMicron(max_width=self.MAX_RENDER_WIDTH)
|
|
self.templates = {}
|
|
|
|
self.templates["base"] = DEFAULT_BASE_TEMPLATE
|
|
self.templates["front"] = DEFAULT_FRONT_TEMPLATE
|
|
self.templates["group"] = DEFAULT_GROUP_TEMPLATE
|
|
|
|
self.destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, self.APP_NAME, "node")
|
|
self.destination.set_link_established_callback(self.remote_connected)
|
|
self.destination.set_default_app_data(self.get_announce_app_data)
|
|
self.register_request_handlers()
|
|
|
|
RNS.log(f"Git Nomad Network Node listening on {RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_NOTICE)
|
|
|
|
self._should_run = True
|
|
self._ready = True
|
|
|
|
def jobs(self):
|
|
while self._should_run:
|
|
time.sleep(self.JOBS_INTERVAL)
|
|
try:
|
|
if self.announce_interval and time.time() > self.last_announce + self.announce_interval: self.announce()
|
|
|
|
except Exception as e: RNS.log(f"Error while running periodic jobs: {e}", RNS.LOG_ERROR)
|
|
|
|
def get_announce_app_data(self): return self.node_name.encode("utf-8")
|
|
|
|
def announce(self):
|
|
self.last_announce = time.time()
|
|
self.destination.announce()
|
|
self.app.message_router.announce_propagation_node()
|
|
|
|
def resolve_permission(self, remote_identity, group_name, repository_name, permission):
|
|
# Since the nomadnet page protocol doesn't *require* authentication,
|
|
# we use null_ident in case the remote hasn't identified.
|
|
if not remote_identity: remote_identity = self.null_ident
|
|
return self.owner.resolve_permission(remote_identity, group_name, repository_name, permission)
|
|
|
|
def register_request_handlers(self):
|
|
self.destination.register_request_handler(self.PATH_INDEX, response_generator=self.serve_front_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_GROUP, response_generator=self.serve_group_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_REPO, response_generator=self.serve_repo_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_TREE, response_generator=self.serve_tree_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_BLOB, response_generator=self.serve_blob_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_COMMITS, response_generator=self.serve_commits_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_COMMIT, response_generator=self.serve_commit_page, allow=RNS.Destination.ALLOW_ALL)
|
|
self.destination.register_request_handler(self.PATH_REFS, response_generator=self.serve_refs_page, allow=RNS.Destination.ALLOW_ALL)
|
|
|
|
def render_template(self, page_content, nav_content=None, template=None, st=None):
|
|
if template and template in self.templates:
|
|
template = self.templates[template]
|
|
page_content = template.replace("{PAGE_CONTENT}", page_content)
|
|
|
|
base_template = self.templates["base"]
|
|
base_template = base_template.replace("{PAGE_CONTENT}", page_content)
|
|
base_template = base_template.replace("{NODE_NAME}", self.node_name)
|
|
base_template = base_template.replace("{VERSION}", __version__)
|
|
|
|
if nav_content: base_template = base_template.replace("{NAVIGATION}", nav_content)
|
|
else: base_template = base_template.replace("{NAVIGATION}", "")
|
|
|
|
gt = f"Generated in {RNS.prettytime(time.time()-st)}" if st else "Unknown generation time"
|
|
base_template = base_template.replace("{GEN_TIME}", gt)
|
|
return base_template.encode("utf-8")
|
|
|
|
#############################
|
|
# Micron Generation Helpers #
|
|
#############################
|
|
|
|
def m_heading(self, text, level=1): return ">" * level + text + "\n"
|
|
def m_bold(self, text): return f"`!{text}`!"
|
|
def m_italic(self, text): return f"`*{text}`*"
|
|
def m_underline(self, text): return f"`_{text}`_"
|
|
def m_color_fg(self, text, color): return f"`F{color}{text}`f"
|
|
def m_divider(self, char="\u2500"): return f"-{char}\n"
|
|
def m_escape(self, text): return text.replace("`", "\\`")
|
|
|
|
def m_link(self, _label, _path, **fields):
|
|
def sanitize_v(value): return urllib.parse.quote_plus(str(value).encode("utf-8"))
|
|
def sanitize_label(value): return value.replace("[", "").replace("]", "").replace("`", "")
|
|
field_str = ""
|
|
if fields:
|
|
field_parts = []
|
|
for k, v in fields.items(): field_parts.append(f"{k}={sanitize_v(v)}")
|
|
field_str = "`" + "|".join(field_parts)
|
|
return f"`[{sanitize_label(_label)}`:{_path}{field_str}]"
|
|
|
|
def m_align(self, text, align="left"):
|
|
align_tag = {"center": "c", "left": "l", "right": "r"}.get(align, "a")
|
|
return f"`{align_tag}{text}`a"
|
|
|
|
|
|
#########################
|
|
# Page Serving Handlers #
|
|
#########################
|
|
|
|
def serve_front_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Front page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
content_parts = []
|
|
nav_parts = []
|
|
|
|
accessible_groups = self.get_accessible_groups(remote_identity)
|
|
|
|
breadcrumb = f"{self.m_link("Node", self.PATH_INDEX)} /"
|
|
nav_parts.append(self.m_align(breadcrumb, "left") + "\n")
|
|
|
|
if not accessible_groups: content_parts.append("No repository groups available.\n")
|
|
else:
|
|
for group_name in sorted(accessible_groups.keys()):
|
|
group = accessible_groups[group_name]
|
|
repo_count = len(group.get("repositories", {}))
|
|
repo_word = "repository" if repo_count == 1 else "repositories"
|
|
|
|
link = self.m_link(f" {self.mdc.BULLET} {group_name}", self.PATH_GROUP, g=group_name)
|
|
content_parts.append(f"{link} ({repo_count} {repo_word})\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
nav_content = "".join(nav_parts)
|
|
return self.render_template(page_content, nav_content=nav_content, template="front", st=st)
|
|
|
|
def serve_group_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Group page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
accessible_repos = self.get_accessible_repositories(remote_identity, group_name)
|
|
|
|
if not group_name or group_name not in self.owner.groups or not accessible_repos:
|
|
content = self.m_heading("Group Not Found", 1) + "\n\nThe requested group does not exist or you do not have access to any repositories in it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
content_parts = []
|
|
nav_parts = []
|
|
|
|
breadcrumb = f"{self.m_link("Node", self.PATH_INDEX)} / {group_name}"
|
|
nav_parts.append(self.m_align(breadcrumb, "left") + "\n")
|
|
|
|
if not accessible_repos: content_parts.append("No repositories available.\n")
|
|
else:
|
|
content_parts.append(self.m_heading("Repositories", 1))
|
|
content_parts.append("\n")
|
|
|
|
for repo_name in sorted(accessible_repos.keys()):
|
|
repo = accessible_repos[repo_name]
|
|
|
|
description = self.get_repository_description(repo["path"])
|
|
|
|
link = self.m_link(f" {self.mdc.BULLET} {repo_name}", self.PATH_REPO, g=group_name, r=repo_name)
|
|
content_parts.append(f"{link}")
|
|
if description: content_parts.append(f" - {description}\n")
|
|
else: content_parts.append("\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
nav_content = "".join(nav_parts)
|
|
return self.render_template(page_content, nav_content=nav_content, template="group", st=st)
|
|
|
|
def serve_repo_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Repository page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
repo_name = data.get("var_r", "") if data else ""
|
|
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
|
|
|
|
if not repo:
|
|
content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
content_parts = []
|
|
|
|
# Breadcrumb navigation
|
|
breadcrumb = f"{self.m_link("Node", self.PATH_INDEX)} / {self.m_link(group_name, self.PATH_GROUP, g=group_name)} / {repo_name}"
|
|
content_parts.append(self.m_align(breadcrumb, "left") + "\n\n")
|
|
|
|
content_parts.append(self.m_heading(repo_name, 1))
|
|
content_parts.append("\n")
|
|
|
|
# Repository metadata
|
|
description = self.get_repository_description(repo["path"])
|
|
if description: content_parts.append(f"{self.m_escape(description)}\n\n")
|
|
|
|
# Navigation links
|
|
content_parts.append(self.m_heading("Browse", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f" {self.m_link('📁 Files', self.PATH_TREE, g=group_name, r=repo_name, ref='HEAD')}\n")
|
|
content_parts.append(f" {self.m_link('📝 Commits', self.PATH_COMMITS, g=group_name, r=repo_name, ref='HEAD')}\n")
|
|
content_parts.append(f" {self.m_link('🏷️ Refs', self.PATH_REFS, g=group_name, r=repo_name)}\n")
|
|
content_parts.append("\n")
|
|
|
|
# Get refs information
|
|
refs = self.get_repository_refs(repo["path"])
|
|
if refs:
|
|
content_parts.append(self.m_heading("Branches", 2))
|
|
content_parts.append("\n")
|
|
|
|
# Display first 10 branches
|
|
for ref in refs.get("heads", [])[:10]: content_parts.append(f" {ref['name']}: {ref['short_hash']}\n")
|
|
if len(refs.get("heads", [])) > 10: content_parts.append(f" ... and {len(refs['heads']) - 10} more\n")
|
|
content_parts.append("\n")
|
|
|
|
if refs.get("tags"):
|
|
content_parts.append(self.m_heading("Tags", 2))
|
|
content_parts.append("\n")
|
|
|
|
# Display first 10 tags
|
|
for ref in refs["tags"][:10]: content_parts.append(f" {ref['name']}: {ref['short_hash']}\n")
|
|
if len(refs["tags"]) > 10: content_parts.append(f" ... and {len(refs['tags']) - 10} more\n")
|
|
content_parts.append("\n")
|
|
|
|
# Readme content
|
|
readme_content, readme_is_markdown = self.get_readme_content(repo["path"])
|
|
if readme_content is not None:
|
|
content_parts.append(self.m_heading("README", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(self.m_divider())
|
|
content_parts.append("\n")
|
|
|
|
if readme_is_markdown:
|
|
converted = self.mdc.format_block(readme_content)
|
|
content_parts.append(converted)
|
|
|
|
else: content_parts.append(f"`=\n{readme_content}\n`=")
|
|
|
|
content_parts.append("\n")
|
|
content_parts.append(self.m_divider())
|
|
|
|
else:
|
|
content_parts.append(self.m_italic("No README file found in this repository."))
|
|
content_parts.append("\n")
|
|
|
|
content_parts.append("\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
def serve_tree_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Tree page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
repo_name = data.get("var_r", "") if data else ""
|
|
ref = data.get("var_ref", "HEAD") if data else "HEAD"
|
|
tree_path = data.get("var_path", "") if data else ""
|
|
tree_path = urllib.parse.unquote_plus(tree_path)
|
|
page_num = 0
|
|
|
|
try:
|
|
page_str = data.get("var_page", "0") if data else "0"
|
|
page_num = max(0, int(page_str))
|
|
|
|
except (ValueError, TypeError): page_num = 0
|
|
|
|
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
|
|
if not repo:
|
|
content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
repo_path = repo["path"]
|
|
|
|
# Validate ref exists
|
|
resolved_ref = self.resolve_ref(repo_path, ref)
|
|
if not resolved_ref:
|
|
content = self.m_heading("Ref Not Found", 1) + f"\n\nThe ref '{ref}' does not exist in this repository.\n"
|
|
content += f"\n" + self.m_link("View All Refs", self.PATH_REFS, g=group_name, r=repo_name) + "\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
content_parts = []
|
|
|
|
# Breadcrumb navigation
|
|
breadcrumb_parts = [ self.m_link("Node", self.PATH_INDEX),
|
|
self.m_link(group_name, self.PATH_GROUP, g=group_name),
|
|
self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name) ]
|
|
|
|
# Add path components to breadcrumb
|
|
if tree_path:
|
|
path_components = tree_path.strip("/").split("/")
|
|
current_path = ""
|
|
for i, component in enumerate(path_components):
|
|
current_path = current_path + "/" + component if current_path else component
|
|
if i == len(path_components) - 1: breadcrumb_parts.append(component) # Last component not a link
|
|
else: breadcrumb_parts.append(self.m_link(component, self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=current_path))
|
|
|
|
else: breadcrumb_parts.append("") # Could be "root" or something, but a bit confusing
|
|
|
|
breadcrumb = " / ".join(breadcrumb_parts)
|
|
content_parts.append(self.m_align(breadcrumb, "left") + "\n\n")
|
|
|
|
content_parts.append(self.m_heading(f"Tree: {tree_path or 'root'}", 1))
|
|
content_parts.append(f"`F666Ref: {ref} ({resolved_ref[:8]})`f\n\n")
|
|
|
|
# Get tree entries
|
|
entries = self.get_tree_entries(repo_path, resolved_ref, tree_path)
|
|
|
|
if entries is None: content_parts.append("Error reading directory contents.\n")
|
|
elif not entries: content_parts.append("Empty directory.\n")
|
|
else:
|
|
# Sort: Directories first, then files, both alphabetically
|
|
def sort_key(entry):
|
|
is_dir = entry["type"] in ("tree", "commit") # commit = submodule
|
|
return (not is_dir, entry["name"].lower())
|
|
|
|
entries.sort(key=sort_key)
|
|
|
|
# Pagination
|
|
total_entries = len(entries)
|
|
start_idx = page_num * self.TREE_ENTRIES_PER_PAGE
|
|
end_idx = start_idx + self.TREE_ENTRIES_PER_PAGE
|
|
page_entries = entries[start_idx:end_idx]
|
|
|
|
if total_entries > self.TREE_ENTRIES_PER_PAGE:
|
|
content_parts.append(f"`F666Showing {start_idx + 1}-{min(end_idx, total_entries)} of {total_entries} entries`f\n\n")
|
|
|
|
content_parts.append(self.m_heading("Contents", 2))
|
|
content_parts.append("\n")
|
|
|
|
# Parent directory link (if not at root)
|
|
if tree_path:
|
|
parent_path = "/".join(tree_path.rstrip("/").split("/")[:-1])
|
|
parent_link = self.m_link("../ (parent directory)", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=parent_path)
|
|
content_parts.append(f" {parent_link}\n")
|
|
|
|
for entry in page_entries:
|
|
entry_name = entry["name"]
|
|
entry_type = entry["type"]
|
|
entry_mode = entry.get("mode", "")
|
|
|
|
# Directory
|
|
if entry_type == "tree":
|
|
subpath = tree_path + "/" + entry_name if tree_path else entry_name
|
|
link = self.m_link(entry_name + "/", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=subpath)
|
|
content_parts.append(f" `F66d📁`f {link}\n")
|
|
|
|
# Submodule
|
|
elif entry_type == "commit":
|
|
content_parts.append(f" `F66d⧉`f {entry_name} `F666(submodule)`f\n")
|
|
|
|
# Symlink
|
|
elif entry_type == "link":
|
|
target = entry.get("link_target", "unknown")
|
|
content_parts.append(f" `F66d↳`f {entry_name} `F666→ {self.m_escape(target)}`f\n")
|
|
|
|
# File (blob)
|
|
else:
|
|
size_str = self.format_size(entry.get("size", 0))
|
|
subpath = tree_path + "/" + entry_name if tree_path else entry_name
|
|
link = self.m_link(entry_name, self.PATH_BLOB, g=group_name, r=repo_name, ref=ref, path=subpath)
|
|
content_parts.append(f" `F66d📄`f {link} `F666({size_str})`f\n")
|
|
|
|
content_parts.append("\n")
|
|
|
|
# Pagination controls
|
|
if total_entries > self.TREE_ENTRIES_PER_PAGE:
|
|
content_parts.append(self.m_heading("Navigation", 2))
|
|
content_parts.append("\n")
|
|
|
|
nav_links = []
|
|
if page_num > 0:
|
|
nav_links.append(self.m_link("« Previous", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=tree_path, page=page_num - 1))
|
|
|
|
total_pages = (total_entries + self.TREE_ENTRIES_PER_PAGE - 1) // self.TREE_ENTRIES_PER_PAGE
|
|
nav_links.append(f"Page {page_num + 1} of {total_pages}")
|
|
|
|
if end_idx < total_entries:
|
|
nav_links.append(self.m_link("Next »", self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=tree_path, page=page_num + 1))
|
|
|
|
content_parts.append(" " + " | ".join(nav_links) + "\n")
|
|
|
|
if content_parts[-1] == "\n": content_parts[-1] = ""
|
|
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
def serve_blob_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Blob page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
repo_name = data.get("var_r", "") if data else ""
|
|
ref = data.get("var_ref", "HEAD") if data else "HEAD"
|
|
file_path = data.get("var_path", "") if data else ""
|
|
file_path = urllib.parse.unquote_plus(file_path)
|
|
|
|
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
|
|
if not repo:
|
|
content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
repo_path = repo["path"]
|
|
|
|
# Validate ref exists
|
|
resolved_ref = self.resolve_ref(repo_path, ref)
|
|
if not resolved_ref:
|
|
content = self.m_heading("Ref Not Found", 1) + f"\n\nThe ref '{ref}' does not exist in this repository.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
# Validate file path
|
|
if not file_path:
|
|
content = self.m_heading("Invalid Path", 1) + "\n\nNo file path specified.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
content_parts = []
|
|
|
|
# Breadcrumb navigation
|
|
breadcrumb_parts = [ self.m_link("Node", self.PATH_INDEX),
|
|
self.m_link(group_name, self.PATH_GROUP, g=group_name),
|
|
self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name) ]
|
|
|
|
# Add path components
|
|
path_components = file_path.strip("/").split("/")
|
|
current_path = ""
|
|
for i, component in enumerate(path_components):
|
|
current_path = current_path + "/" + component if current_path else component
|
|
if i == len(path_components) - 1: breadcrumb_parts.append(component) # Last component (file) not a link
|
|
else: breadcrumb_parts.append(self.m_link(component, self.PATH_TREE, g=group_name, r=repo_name, ref=ref, path=current_path))
|
|
|
|
breadcrumb = " / ".join(breadcrumb_parts)
|
|
content_parts.append(self.m_align(breadcrumb, "left") + "\n\n")
|
|
|
|
content_parts.append(self.m_heading(file_path, 1))
|
|
content_parts.append(f"`F666Ref: {ref} ({resolved_ref[:8]})`f\n\n")
|
|
|
|
# Get blob info
|
|
blob_info = self.get_blob_info(repo_path, resolved_ref, file_path)
|
|
|
|
if blob_info is None: content_parts.append("File not found at this ref.\n")
|
|
else:
|
|
size = blob_info.get("size", 0)
|
|
is_binary = blob_info.get("is_binary", False)
|
|
is_symlink = blob_info.get("is_symlink", False)
|
|
symlink_target = blob_info.get("symlink_target")
|
|
|
|
# File metadata
|
|
content_parts.append(self.m_heading("File Info", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f" Size: {self.format_size(size)} ({size} bytes)\n")
|
|
content_parts.append(f" Type: {'Binary' if is_binary else 'Text'}\n")
|
|
if is_symlink: content_parts.append(f" Symlink: → {self.m_escape(symlink_target or 'unknown')}\n")
|
|
content_parts.append("\n")
|
|
|
|
# Content display
|
|
if is_symlink:
|
|
content_parts.append(self.m_heading("Symlink Target", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f" {self.m_escape(symlink_target or 'unknown')}\n")
|
|
|
|
elif is_binary:
|
|
content_parts.append(self.m_heading("Binary File", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append("This file appears to be binary and cannot be displayed as text.\n")
|
|
# TODO: Implement raw file downloads
|
|
|
|
elif size > self.BLOB_SIZE_LIMIT:
|
|
content_parts.append(self.m_heading("File Too Large", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f"This file is {self.format_size(size)}, which exceeds the display limit of {self.format_size(self.BLOB_SIZE_LIMIT)}.\n")
|
|
|
|
else:
|
|
# Display file content
|
|
content = self.get_blob_content(repo_path, resolved_ref, file_path)
|
|
if content is not None:
|
|
content_parts.append(self.m_heading("Content", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f"`=\n{content}\n`=")
|
|
else:
|
|
content_parts.append("Error reading file content.\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
def serve_commits_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Commits page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
repo_name = data.get("var_r", "") if data else ""
|
|
ref = data.get("var_ref", "HEAD") if data else "HEAD"
|
|
file_path = data.get("var_path", "") if data else ""
|
|
file_path = urllib.parse.unquote_plus(file_path)
|
|
page_num = 0
|
|
|
|
try:
|
|
page_str = data.get("var_page", "0") if data else "0"
|
|
page_num = max(0, int(page_str))
|
|
|
|
except (ValueError, TypeError): page_num = 0
|
|
|
|
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
|
|
if not repo:
|
|
content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
repo_path = repo["path"]
|
|
|
|
# Validate ref exists
|
|
resolved_ref = self.resolve_ref(repo_path, ref)
|
|
if not resolved_ref:
|
|
content = self.m_heading("Ref Not Found", 1) + f"\n\nThe ref '{ref}' does not exist in this repository.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
content_parts = []
|
|
|
|
# Breadcrumb navigation
|
|
breadcrumb_parts = [ self.m_link("Node", self.PATH_INDEX),
|
|
self.m_link(group_name, self.PATH_GROUP, g=group_name),
|
|
self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name),
|
|
"commits" ]
|
|
|
|
if file_path: breadcrumb_parts.insert(3, f"{self.m_escape(file_path)}")
|
|
|
|
breadcrumb = " / ".join(breadcrumb_parts)
|
|
content_parts.append(self.m_align(breadcrumb, "left") + "\n\n")
|
|
|
|
title_suffix = f" ({file_path})" if file_path else ""
|
|
content_parts.append(self.m_heading(f"Commit History{title_suffix}", 1))
|
|
content_parts.append(f"`F666Ref: {ref} ({resolved_ref[:8]})`f\n\n")
|
|
|
|
# Get commits
|
|
skip = page_num * self.COMMITS_PER_PAGE
|
|
commits = self.get_commits(repo_path, resolved_ref, file_path, skip, self.COMMITS_PER_PAGE)
|
|
|
|
if commits is None: content_parts.append("Error reading commit history.\n")
|
|
elif not commits: content_parts.append("No commits found.\n")
|
|
else:
|
|
content_parts.append(self.m_heading("Commits", 2))
|
|
content_parts.append("\n")
|
|
|
|
for commit in commits:
|
|
short_hash = commit["hash"][:7]
|
|
subject = commit["subject"]
|
|
author = commit["author"]
|
|
date = self.format_relative_time(commit["timestamp"])
|
|
|
|
hash_link = self.m_link(short_hash, self.PATH_COMMIT, g=group_name, r=repo_name, h=commit["hash"])
|
|
|
|
content_parts.append(f"`F66d{hash_link}`f {self.m_escape(author)} - {date}\n")
|
|
content_parts.append(f"{self.m_escape(subject)}\n\n")
|
|
|
|
# Pagination controls
|
|
has_more = len(commits) == self.COMMITS_PER_PAGE
|
|
|
|
if page_num > 0 or has_more:
|
|
content_parts.append(self.m_heading("Navigation", 2))
|
|
content_parts.append("\n")
|
|
|
|
nav_links = []
|
|
if page_num > 0: nav_links.append(self.m_link("« Newer", self.PATH_COMMITS, g=group_name, r=repo_name, ref=ref, path=file_path, page=page_num - 1))
|
|
nav_links.append(f"Page {page_num + 1}")
|
|
if has_more: nav_links.append(self.m_link("Older »", self.PATH_COMMITS, g=group_name, r=repo_name, ref=ref, path=file_path, page=page_num + 1))
|
|
content_parts.append(" " + " | ".join(nav_links) + "\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
def serve_commit_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Commit page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
repo_name = data.get("var_r", "") if data else ""
|
|
commit_hash = data.get("var_h", "") if data else ""
|
|
|
|
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
|
|
if not repo:
|
|
content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
repo_path = repo["path"]
|
|
|
|
# Validate commit hash
|
|
if not commit_hash or len(commit_hash) < 7:
|
|
content = self.m_heading("Invalid Commit", 1) + "\n\nNo valid commit hash specified.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
# Resolve and validate the commit hash
|
|
resolved_hash = self.resolve_ref(repo_path, commit_hash)
|
|
if not resolved_hash:
|
|
content = self.m_heading("Commit Not Found", 1) + f"\n\nThe commit '{commit_hash}' does not exist in this repository.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
# Verify it's actually a commit object
|
|
try:
|
|
type_result = subprocess.run(["git", "cat-file", "-t", resolved_hash],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if type_result.returncode != 0 or type_result.stdout.strip() != "commit":
|
|
content = self.m_heading("Invalid Object", 1) + f"\n\nThe hash '{commit_hash}' does not refer to a commit.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
except Exception:
|
|
content = self.m_heading("Error", 1) + "\n\nCould not verify commit object.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
content_parts = []
|
|
|
|
# Breadcrumb navigation
|
|
breadcrumb = f"{self.m_link("Node", self.PATH_INDEX)} / {self.m_link(group_name, self.PATH_GROUP, g=group_name)} / {self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name)} / {resolved_hash[:7]}"
|
|
content_parts.append(self.m_align(breadcrumb, "left") + "\n\n")
|
|
|
|
commit_info = self.get_commit_info(repo_path, resolved_hash)
|
|
if not commit_info:
|
|
content_parts.append(self.m_heading("Error", 1) + "\n\nCould not retrieve commit information.\n")
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
content_parts.append(self.m_heading(f"Commit {resolved_hash[:7]}", 1))
|
|
content_parts.append("\n")
|
|
|
|
# Commit metadata
|
|
content_parts.append(self.m_heading("Details", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f" Hash: {resolved_hash}\n")
|
|
|
|
if commit_info.get("parents"):
|
|
parent_links = []
|
|
for parent_hash in commit_info["parents"]:
|
|
parent_link = self.m_link(parent_hash[:7], self.PATH_COMMIT, g=group_name, r=repo_name, h=parent_hash)
|
|
parent_links.append(parent_link)
|
|
|
|
content_parts.append(f" Parents: {' '.join(parent_links)}\n")
|
|
|
|
content_parts.append(f" Author: {self.m_escape(commit_info['author_name'])} <{self.m_escape(commit_info['author_email'])}>\n")
|
|
content_parts.append(f" Date: {commit_info['author_date']}\n")
|
|
|
|
if commit_info.get("committer_name") != commit_info.get("author_name"):
|
|
content_parts.append(f" Commit: {self.m_escape(commit_info['committer_name'])} <{self.m_escape(commit_info['committer_email'])}>\n")
|
|
content_parts.append(f" Date: {commit_info['committer_date']}\n")
|
|
|
|
content_parts.append("\n")
|
|
|
|
# Commit message
|
|
if commit_info.get("message"):
|
|
content_parts.append(self.m_heading("Message", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(self.m_escape(commit_info["message"]) + "\n")
|
|
content_parts.append("\n")
|
|
|
|
# Changed files
|
|
if commit_info.get("files"):
|
|
content_parts.append(self.m_heading("Changes", 2))
|
|
content_parts.append("\n")
|
|
|
|
total_additions = sum(f.get("additions", 0) for f in commit_info["files"])
|
|
total_deletions = sum(f.get("deletions", 0) for f in commit_info["files"])
|
|
|
|
content_parts.append(f" {len(commit_info['files'])} files changed, {total_additions} insertions(+), {total_deletions} deletions(-)\n\n")
|
|
|
|
for file_info in commit_info["files"]:
|
|
status = file_info.get("status", "M")
|
|
file_path = file_info.get("path", "unknown")
|
|
additions = file_info.get("additions", 0)
|
|
deletions = file_info.get("deletions", 0)
|
|
|
|
status_indicators = {"A": "`F0a0A`f", # Added - green
|
|
"D": "`F900D`f", # Deleted - red
|
|
"M": "`Faa0M`f", # Modified - yellow
|
|
"R": "`F0aaR`f" } # Renamed - cyan
|
|
|
|
status_display = status_indicators.get(status, status)
|
|
|
|
# File path as link to blob at this commit
|
|
file_link = self.m_link(self.m_escape(file_path), self.PATH_BLOB, g=group_name, r=repo_name, ref=resolved_hash, path=file_path)
|
|
|
|
stats = []
|
|
if additions > 0: stats.append(f"`F0a0+{additions}`f")
|
|
if deletions > 0: stats.append(f"`F900-{deletions}`f")
|
|
|
|
stats_str = " ".join(stats) if stats else ""
|
|
content_parts.append(f" {status_display} {file_link} {stats_str}\n")
|
|
|
|
content_parts.append("\n")
|
|
|
|
if self.SHOW_DIFF_BY_DEFAULT and commit_info.get("diff"):
|
|
content_parts.append(self.m_heading("Diff", 2))
|
|
content_parts.append("\n")
|
|
formatted_diff = self.format_diff(commit_info["diff"])
|
|
content_parts.append(f"{formatted_diff.lstrip()}")
|
|
|
|
# Navigation to tree at this commit
|
|
content_parts.append("\n")
|
|
content_parts.append(self.m_heading("Browse", 2))
|
|
content_parts.append("\n")
|
|
content_parts.append(f" {self.m_link('📁 Browse files at this commit', self.PATH_TREE, g=group_name, r=repo_name, ref=resolved_hash)}\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
def serve_refs_page(self, path, data, request_id, link_id, remote_identity, requested_at):
|
|
st = time.time()
|
|
RNS.log(f"Refs page request from {remote_identity}", RNS.LOG_DEBUG)
|
|
|
|
group_name = data.get("var_g", "") if data else ""
|
|
repo_name = data.get("var_r", "") if data else ""
|
|
ref_type = data.get("var_type", "") if data else "" # "heads", "tags", or empty for both
|
|
|
|
repo = self.get_accessible_repository(remote_identity, group_name, repo_name)
|
|
if not repo:
|
|
content = self.m_heading("Not Found", 1) + "\n\nThe requested repository does not exist or you do not have access to it.\n"
|
|
return self.render_template(content, st=st)
|
|
|
|
repo_path = repo["path"]
|
|
|
|
content_parts = []
|
|
|
|
# Breadcrumb navigation
|
|
breadcrumb = f"{self.m_link("Node", self.PATH_INDEX)} / {self.m_link(group_name, self.PATH_GROUP, g=group_name)} / {self.m_link(repo_name, self.PATH_REPO, g=group_name, r=repo_name)} / refs"
|
|
content_parts.append(self.m_align(breadcrumb, "left") + "\n\n")
|
|
|
|
content_parts.append(self.m_heading("Refs", 1))
|
|
content_parts.append("\n")
|
|
|
|
# Get default branch (HEAD)
|
|
default_branch = None
|
|
try:
|
|
head_result = subprocess.run(["git", "symbolic-ref", "HEAD"],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if head_result.returncode == 0: default_branch = head_result.stdout.strip().replace("refs/heads/", "")
|
|
|
|
except Exception: pass
|
|
|
|
show_heads = not ref_type or ref_type == "heads"
|
|
show_tags = not ref_type or ref_type == "tags"
|
|
|
|
refs_data = self.get_refs_info(repo_path, default_branch)
|
|
|
|
if show_heads and refs_data.get("heads"):
|
|
content_parts.append(self.m_heading(f"Branches ({len(refs_data['heads'])})", 2))
|
|
content_parts.append("\n")
|
|
|
|
for ref_info in refs_data["heads"]:
|
|
branch_name = ref_info["name"]
|
|
short_hash = ref_info["short_hash"]
|
|
is_default = ref_info.get("is_default", False)
|
|
commit_subject = ref_info.get("commit_subject", "")
|
|
|
|
# Branch name with default indicator
|
|
name_display = f"`F0a0{branch_name}`f" if is_default else branch_name
|
|
default_marker = " `F0a0(default)`f" if is_default else ""
|
|
|
|
# Links to tree and commits at this branch
|
|
tree_link = self.m_link("tree", self.PATH_TREE, g=group_name, r=repo_name, ref=branch_name)
|
|
commits_link = self.m_link("commits", self.PATH_COMMITS, g=group_name, r=repo_name, ref=branch_name)
|
|
|
|
content_parts.append(f" {name_display}{default_marker}\n")
|
|
content_parts.append(f" {short_hash}: {self.m_escape(commit_subject)}\n")
|
|
content_parts.append(f" [{tree_link}] [{commits_link}]\n\n")
|
|
|
|
if show_tags and refs_data.get("tags"):
|
|
content_parts.append(self.m_heading(f"Tags ({len(refs_data['tags'])})", 2))
|
|
content_parts.append("\n")
|
|
|
|
for ref_info in refs_data["tags"]:
|
|
tag_name = ref_info["name"]
|
|
short_hash = ref_info["short_hash"]
|
|
is_annotated = ref_info.get("is_annotated", False)
|
|
tag_message = ref_info.get("tag_message", "")
|
|
commit_subject = ref_info.get("commit_subject", "")
|
|
|
|
# Tag name with annotated indicator
|
|
annotated_marker = " `Faa0(annotated)`f" if is_annotated else ""
|
|
|
|
# Links to tree and commits at this tag
|
|
tree_link = self.m_link("tree", self.PATH_TREE, g=group_name, r=repo_name, ref=tag_name)
|
|
commits_link = self.m_link("commits", self.PATH_COMMITS, g=group_name, r=repo_name, ref=tag_name)
|
|
|
|
content_parts.append(f" {tag_name}{annotated_marker}\n")
|
|
if is_annotated and tag_message: content_parts.append(f" Tag: {self.m_escape(tag_message[:60])}\n")
|
|
content_parts.append(f" {short_hash}: {self.m_escape(commit_subject)}\n")
|
|
content_parts.append(f" [{tree_link}] [{commits_link}]\n\n")
|
|
|
|
# No refs found
|
|
if (show_heads and not refs_data.get("heads")) and (show_tags and not refs_data.get("tags")):
|
|
content_parts.append("No refs found in this repository.\n")
|
|
|
|
content_parts.append("\n")
|
|
|
|
# Filtering links
|
|
content_parts.append(self.m_heading("Filter", 2))
|
|
content_parts.append("\n")
|
|
filter_links = []
|
|
filter_links.append(self.m_link("All", self.PATH_REFS, g=group_name, r=repo_name))
|
|
filter_links.append(self.m_link("Branches only", self.PATH_REFS, g=group_name, r=repo_name, type="heads"))
|
|
filter_links.append(self.m_link("Tags only", self.PATH_REFS, g=group_name, r=repo_name, type="tags"))
|
|
content_parts.append(" " + " | ".join(filter_links) + "\n")
|
|
|
|
page_content = "".join(content_parts)
|
|
return self.render_template(page_content, st=st)
|
|
|
|
#######################
|
|
# Git Data Extraction #
|
|
#######################
|
|
|
|
def get_repository_description(self, repo_path):
|
|
try:
|
|
# Try git config first
|
|
result = subprocess.run(["git", "config", "--get", "repository.description"],
|
|
cwd=repo_path, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip()
|
|
|
|
# Fall back to description file
|
|
desc_path = f"{repo_path}.description"
|
|
if os.path.isfile(desc_path):
|
|
with open(desc_path, "r") as f:
|
|
desc = f.read().strip()
|
|
if desc: return desc
|
|
|
|
except Exception as e: RNS.log(f"Error getting repository description: {e}", RNS.LOG_DEBUG)
|
|
|
|
return None
|
|
|
|
def get_repository_refs(self, repo_path):
|
|
refs = {"heads": [], "tags": []}
|
|
|
|
try:
|
|
# Get all refs with their hashes
|
|
result = subprocess.run(["git", "for-each-ref", "--format", "%(objectname) %(refname) %(refname:short)", "refs/heads", "refs/tags"],
|
|
cwd=repo_path, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode != 0: return refs
|
|
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line.strip(): continue
|
|
parts = line.split(" ", 2)
|
|
if len(parts) >= 2:
|
|
full_hash = parts[0]
|
|
ref_name = parts[1]
|
|
short_name = parts[2] if len(parts) > 2 else ref_name
|
|
short_hash = full_hash[:7]
|
|
|
|
ref_info = { "name": short_name,
|
|
"hash": full_hash,
|
|
"short_hash": short_hash }
|
|
|
|
if ref_name.startswith("refs/heads/"): refs["heads"].append(ref_info)
|
|
elif ref_name.startswith("refs/tags/"): refs["tags"].append(ref_info)
|
|
|
|
except Exception as e: RNS.log(f"Error getting repository refs: {e}", RNS.LOG_DEBUG)
|
|
|
|
return refs
|
|
|
|
def resolve_ref(self, repo_path, ref):
|
|
try:
|
|
result = subprocess.run(["git", "rev-parse", "--verify", ref],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode == 0:
|
|
hash_val = result.stdout.strip()
|
|
# Validate it's a 40-char hex string
|
|
if len(hash_val) == 40 and all(c in "0123456789abcdef" for c in hash_val.lower()): return hash_val.lower()
|
|
|
|
except subprocess.TimeoutExpired: RNS.log(f"Timeout resolving ref '{ref}'", RNS.LOG_WARNING)
|
|
except Exception as e: RNS.log(f"Error resolving ref: {e}", RNS.LOG_WARNING)
|
|
return None
|
|
|
|
def get_tree_entries(self, repo_path, ref, path):
|
|
entries = []
|
|
tree_path = path.strip("/") if path else ""
|
|
|
|
try:
|
|
# Use git ls-tree to list directory contents
|
|
ls_tree_path = f"{ref}:{tree_path}" if tree_path else ref
|
|
result = subprocess.run(["git", "ls-tree", "-l", ls_tree_path],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode != 0:
|
|
# Check if it's actually a tree
|
|
cat_result = subprocess.run(["git", "cat-file", "-t", ls_tree_path],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if cat_result.returncode != 0 or cat_result.stdout.strip() != "tree": return None # Not a valid tree
|
|
return [] # Valid but empty tree
|
|
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line.strip(): continue
|
|
|
|
# Parse ls-tree output: <mode> <type> <sha> <size>\t<name>
|
|
# Format: 100644 blob abc123 1234\tfilename
|
|
parts = line.split("\t", 1)
|
|
if len(parts) != 2: continue
|
|
|
|
meta_part = parts[0]
|
|
name = parts[1]
|
|
|
|
meta_parts = meta_part.split()
|
|
if len(meta_parts) < 3: continue
|
|
|
|
mode = meta_parts[0]
|
|
obj_type = meta_parts[1]
|
|
# sha = meta_parts[2]
|
|
size = 0
|
|
if len(meta_parts) >= 4:
|
|
try: size = int(meta_parts[3])
|
|
except ValueError: size = 0
|
|
|
|
entry = { "name": name,
|
|
"type": obj_type, # blob, tree, or commit
|
|
"mode": mode,
|
|
"size": size,
|
|
"link_target": None }
|
|
|
|
# Detect symlinks (mode 120000) and get symlink target
|
|
if mode == "120000":
|
|
entry["type"] = "link"
|
|
try:
|
|
symlink_result = subprocess.run(["git", "show", f"{ref}:{tree_path}/{name}" if tree_path else f"{ref}:{name}"],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if symlink_result.returncode == 0: entry["link_target"] = symlink_result.stdout.strip()
|
|
|
|
except Exception: pass
|
|
|
|
entries.append(entry)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
RNS.log(f"Timeout listing tree contents", RNS.LOG_WARNING)
|
|
return None
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error getting tree entries: {e}", RNS.LOG_WARNING)
|
|
return None
|
|
|
|
return entries
|
|
|
|
def get_blob_info(self, repo_path, ref, path):
|
|
file_path = path.strip("/")
|
|
|
|
try:
|
|
# Get object info
|
|
result = subprocess.run(["git", "cat-file", "-s", f"{ref}:{file_path}"],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode != 0: return None
|
|
|
|
size = int(result.stdout.strip())
|
|
|
|
# Check if it's a symlink via ls-tree
|
|
parent_dir = "/".join(file_path.split("/")[:-1])
|
|
filename = file_path.split("/")[-1]
|
|
|
|
ls_tree_path = f"{ref}:{parent_dir}" if parent_dir else ref
|
|
result = subprocess.run(["git", "ls-tree", ls_tree_path],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
is_symlink = False
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split("\n"):
|
|
if f"\t{filename}" in line and line.startswith("120000"):
|
|
is_symlink = True
|
|
break
|
|
|
|
# Get symlink target if applicable
|
|
symlink_target = None
|
|
if is_symlink:
|
|
content_result = subprocess.run(["git", "show", f"{ref}:{file_path}"],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if content_result.returncode == 0: symlink_target = content_result.stdout.strip()
|
|
|
|
# Binary detection using git diff --numstat
|
|
is_binary = False
|
|
if not is_symlink:
|
|
# Try to get a sample of the file to check for null bytes
|
|
sample_result = subprocess.run(["git", "show", f"{ref}:{file_path}"],
|
|
cwd=repo_path, capture_output=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if sample_result.returncode == 0:
|
|
# Check for null bytes in first 8KB
|
|
sample = sample_result.stdout[:8192]
|
|
if b'\x00' in sample: is_binary = True
|
|
else:
|
|
# Also check using git diff approach for text encoding issues
|
|
diff_result = subprocess.run(["git", "diff", "--numstat", "--no-index", "--", "/dev/null", f"{ref}:{file_path}"],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
# If git says it's binary, the line will start with "-" "-"
|
|
if diff_result.returncode == 1: # git diff returns 1 for differences
|
|
first_line = diff_result.stdout.strip().split("\n")[0] if diff_result.stdout else ""
|
|
if first_line.startswith("-"): is_binary = True
|
|
|
|
return { "size": size,
|
|
"is_binary": is_binary,
|
|
"is_symlink": is_symlink,
|
|
"symlink_target": symlink_target }
|
|
|
|
except subprocess.TimeoutExpired:
|
|
RNS.log(f"Timeout getting blob info", RNS.LOG_DEBUG)
|
|
return None
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error getting blob info: {e}", RNS.LOG_DEBUG)
|
|
return None
|
|
|
|
def get_blob_content(self, repo_path, ref, path):
|
|
file_path = path.strip("/")
|
|
|
|
try:
|
|
result = subprocess.run(["git", "show", f"{ref}:{file_path}"],
|
|
cwd=repo_path, capture_output=True, text=True,
|
|
timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode == 0: return result.stdout
|
|
|
|
except subprocess.TimeoutExpired: RNS.log(f"Timeout getting blob content", RNS.LOG_WARNING)
|
|
except Exception as e: RNS.log(f"Error getting blob content: {e}", RNS.LOG_WARNING)
|
|
|
|
return None
|
|
|
|
def get_refs_info(self, repo_path, default_branch=None):
|
|
refs = {"heads": [], "tags": []}
|
|
|
|
try:
|
|
# Get all refs with their hashes and commit info
|
|
# Format: objectname refname refname:short subject
|
|
result = subprocess.run(["git", "for-each-ref", "--format=%(objectname)|%(refname)|%(refname:short)|%(subject)", "refs/heads", "refs/tags"],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode != 0: return refs
|
|
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line.strip(): continue
|
|
|
|
parts = line.split("|", 3)
|
|
if len(parts) < 3: continue
|
|
|
|
full_hash = parts[0]
|
|
ref_name = parts[1]
|
|
short_name = parts[2]
|
|
subject = parts[3] if len(parts) > 3 else ""
|
|
short_hash = full_hash[:7]
|
|
|
|
ref_info = { "name": short_name,
|
|
"hash": full_hash,
|
|
"short_hash": short_hash,
|
|
"commit_subject": subject,
|
|
"is_default": short_name == default_branch,
|
|
"is_annotated": False,
|
|
"tag_message": None }
|
|
|
|
if ref_name.startswith("refs/heads/"): refs["heads"].append(ref_info)
|
|
|
|
elif ref_name.startswith("refs/tags/"):
|
|
# Check if it's an annotated tag
|
|
try:
|
|
# Get the object type this tag points to
|
|
tag_result = subprocess.run(["git", "for-each-ref", "--format=%(objecttype)|%(contents:subject)", ref_name],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if tag_result.returncode == 0:
|
|
tag_parts = tag_result.stdout.strip().split("|", 1)
|
|
if len(tag_parts) >= 2:
|
|
obj_type = tag_parts[0]
|
|
if obj_type == "tag":
|
|
ref_info["is_annotated"] = True
|
|
ref_info["tag_message"] = tag_parts[1]
|
|
|
|
except Exception: pass
|
|
|
|
refs["tags"].append(ref_info)
|
|
|
|
except subprocess.TimeoutExpired: RNS.log(f"Timeout getting refs info", RNS.LOG_WARNING)
|
|
except Exception as e: RNS.log(f"Error getting refs info: {e}", RNS.LOG_WARNING)
|
|
|
|
return refs
|
|
|
|
def get_commits(self, repo_path, ref, file_path, skip, limit):
|
|
commits = []
|
|
|
|
try:
|
|
cmd = ["git", "log", "--format=%H|%s|%an|%ae|%at", "--skip", str(skip), "-n", str(limit), ref]
|
|
if file_path: cmd.extend(["--", file_path])
|
|
|
|
result = subprocess.run(cmd, cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode != 0: return None
|
|
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line.strip(): continue
|
|
|
|
parts = line.split("|", 4)
|
|
if len(parts) >= 5:
|
|
commits.append({ "hash": parts[0],
|
|
"subject": parts[1],
|
|
"author": parts[2],
|
|
"author_email": parts[3],
|
|
"timestamp": int(parts[4]) })
|
|
|
|
except subprocess.TimeoutExpired:
|
|
RNS.log(f"Timeout getting commits", RNS.LOG_DEBUG)
|
|
return None
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error getting commits: {e}", RNS.LOG_DEBUG)
|
|
return None
|
|
|
|
return commits
|
|
|
|
def get_commit_info(self, repo_path, commit_hash):
|
|
try:
|
|
# Get commit metadata
|
|
format_str = "%P%n%an%n%ae%n%aI%n%cn%n%ce%n%cI%n%B"
|
|
result = subprocess.run(["git", "show", "--no-patch", "--format=" + format_str, commit_hash],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if result.returncode != 0: return None
|
|
|
|
lines = result.stdout.split("\n")
|
|
if len(lines) < 7: return None
|
|
|
|
# Parse parents (space-separated hashes on first line)
|
|
parents = lines[0].strip().split() if lines[0].strip() else []
|
|
|
|
info = { "parents": parents,
|
|
"author_name": lines[1],
|
|
"author_email": lines[2],
|
|
"author_date": lines[3],
|
|
"committer_name": lines[4],
|
|
"committer_email": lines[5],
|
|
"committer_date": lines[6],
|
|
"message": "\n".join(lines[7:]).strip(),
|
|
"files": [],
|
|
"diff": None }
|
|
|
|
# Get file change statistics
|
|
stats_result = subprocess.run(["git", "diff-tree", "--numstat", "-r", commit_hash],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if stats_result.returncode == 0:
|
|
for line in stats_result.stdout.strip().split("\n"):
|
|
if not line.strip(): continue
|
|
|
|
# Parse numstat output: <additions>\t<deletions>\t<path>
|
|
parts = line.split("\t")
|
|
if len(parts) >= 3:
|
|
additions = parts[0]
|
|
deletions = parts[1]
|
|
file_path = parts[2]
|
|
|
|
# Detect status based on additions/deletions pattern
|
|
status = "M" # Modified by default
|
|
if additions == "-" and deletions == "-":
|
|
status = "R" # Renamed (binary or unmerged)
|
|
additions = "0"
|
|
deletions = "0"
|
|
|
|
# Check for added/deleted via --diff-filter would require second call
|
|
# Instead, we'll use a simpler approach: check if file exists in parent
|
|
if info["parents"]:
|
|
parent = info["parents"][0]
|
|
# Check if file exists in parent
|
|
check_result = subprocess.run(["git", "cat-file", "-e", f"{parent}:{file_path}"],
|
|
cwd=repo_path, capture_output=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if check_result.returncode != 0: status = "A" # Added (didn't exist in parent)
|
|
else:
|
|
# Check if file exists in current commit
|
|
check_current = subprocess.run(["git", "cat-file", "-e", f"{commit_hash}:{file_path}"],
|
|
cwd=repo_path, capture_output=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if check_current.returncode != 0: status = "D" # Deleted (doesn't exist in current)
|
|
|
|
try:
|
|
add_count = int(additions) if additions != "-" else 0
|
|
del_count = int(deletions) if deletions != "-" else 0
|
|
|
|
except ValueError:
|
|
add_count = 0
|
|
del_count = 0
|
|
|
|
info["files"].append({ "path": file_path,
|
|
"status": status,
|
|
"additions": add_count,
|
|
"deletions": del_count })
|
|
|
|
# Get diff if enabled
|
|
if self.SHOW_DIFF_BY_DEFAULT:
|
|
diff_result = subprocess.run(["git", "show", "--format=", commit_hash],
|
|
cwd=repo_path, capture_output=True, text=True, timeout=self.GIT_COMMAND_TIMEOUT, check=False)
|
|
|
|
if diff_result.returncode == 0: info["diff"] = diff_result.stdout
|
|
|
|
return info
|
|
|
|
except subprocess.TimeoutExpired:
|
|
RNS.log(f"Timeout getting commit info", RNS.LOG_WARNING)
|
|
return None
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error getting commit info: {e}", RNS.LOG_WARNING)
|
|
return None
|
|
|
|
def get_readme_content(self, repo_path):
|
|
readme_names = [ ("README.md", True), ("README.rst", False), ("README.txt", False), ("README", False),
|
|
("readme.md", True), ("readme.rst", False), ("readme.txt", False), ("readme", False) ]
|
|
|
|
for readme_name, is_markdown in readme_names:
|
|
try:
|
|
result = subprocess.run(["git", "show", f"HEAD:{readme_name}"],
|
|
cwd=repo_path, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode == 0: return result.stdout, is_markdown
|
|
|
|
except Exception: continue
|
|
|
|
return None, False
|
|
|
|
###################
|
|
# Utility Methods #
|
|
###################
|
|
|
|
def format_size(self, size_bytes): return RNS.prettysize(size_bytes)
|
|
|
|
def format_relative_time(self, timestamp):
|
|
now = time.time()
|
|
diff = now - timestamp
|
|
|
|
if diff < 60:
|
|
return "just now"
|
|
elif diff < 3600:
|
|
minutes = int(diff / 60)
|
|
return f"{minutes} minute{'s' if minutes != 1 else ''} ago"
|
|
elif diff < 86400:
|
|
hours = int(diff / 3600)
|
|
return f"{hours} hour{'s' if hours != 1 else ''} ago"
|
|
elif diff < 604800:
|
|
days = int(diff / 86400)
|
|
return f"{days} day{'s' if days != 1 else ''} ago"
|
|
elif diff < 2592000:
|
|
weeks = int(diff / 604800)
|
|
return f"{weeks} week{'s' if weeks != 1 else ''} ago"
|
|
elif diff < 31536000:
|
|
months = int(diff / 2592000)
|
|
return f"{months} month{'s' if months != 1 else ''} ago"
|
|
else:
|
|
years = int(diff / 31536000)
|
|
return f"{years} year{'s' if years != 1 else ''} ago"
|
|
|
|
def format_diff(self, diff_text: str) -> str:
|
|
lines = diff_text.split("\n")
|
|
formatted_lines = []
|
|
|
|
for line in lines:
|
|
if line.startswith("+"):
|
|
if line.startswith("+++"): formatted_lines.append(self.m_escape(line))
|
|
else: formatted_lines.append(f"`F0a0{self.m_escape(line)}`f")
|
|
|
|
elif line.startswith("-"):
|
|
if line.startswith("---"): formatted_lines.append(self.m_escape(f"\\{line}"))
|
|
else: formatted_lines.append(f"`F900{self.m_escape(line)}`f")
|
|
elif line.startswith("@@"):
|
|
formatted_lines.append(f"`F0aa{self.m_escape(line)}`f")
|
|
|
|
elif line.startswith("diff ") or line.startswith("index ") or line.startswith("new file") or line.startswith("deleted file"):
|
|
if line.startswith("diff --git a"): formatted_lines.append("")
|
|
formatted_lines.append(f"`F666{self.m_escape(line)}`f")
|
|
|
|
else: formatted_lines.append(self.m_escape(line))
|
|
|
|
return "\n".join(formatted_lines)
|
|
|
|
#######################
|
|
# Connection Handlers #
|
|
#######################
|
|
|
|
def remote_connected(self, link):
|
|
RNS.log(f"Peer connected to {self.destination}", RNS.LOG_DEBUG)
|
|
link.set_remote_identified_callback(self.remote_identified)
|
|
link.set_link_closed_callback(self.remote_disconnected)
|
|
|
|
def remote_disconnected(self, link):
|
|
RNS.log(f"Peer disconnected from {self.destination}", RNS.LOG_DEBUG)
|
|
|
|
def remote_identified(self, link, identity):
|
|
RNS.log(f"Peer identified as {link.get_remote_identity()} on {link}", RNS.LOG_DEBUG)
|
|
|
|
######################
|
|
# Permission Control #
|
|
######################
|
|
|
|
def get_accessible_groups(self, remote_identity):
|
|
accessible_groups = {}
|
|
|
|
for group_name, group_data in self.owner.groups.items():
|
|
accessible_repos = self.get_accessible_repositories(remote_identity, group_name)
|
|
|
|
if accessible_repos:
|
|
accessible_groups[group_name] = { "path": group_data["path"], "repositories": accessible_repos }
|
|
|
|
return accessible_groups
|
|
|
|
def get_accessible_repositories(self, remote_identity, group_name):
|
|
if group_name not in self.owner.groups: return {}
|
|
|
|
group_data = self.owner.groups[group_name]
|
|
all_repos = group_data.get("repositories", {})
|
|
accessible_repos = {}
|
|
|
|
for repo_name, repo_data in all_repos.items():
|
|
if self.resolve_permission(remote_identity, group_name, repo_name, self.owner.PERM_READ):
|
|
accessible_repos[repo_name] = repo_data
|
|
|
|
return accessible_repos
|
|
|
|
def get_accessible_repository(self, remote_identity, group_name, repo_name):
|
|
if group_name not in self.owner.groups: return None
|
|
|
|
group_data = self.owner.groups[group_name]
|
|
all_repos = group_data.get("repositories", {})
|
|
|
|
if repo_name not in all_repos: return None
|
|
|
|
if self.resolve_permission(remote_identity, group_name, repo_name, self.owner.PERM_READ):
|
|
return all_repos[repo_name]
|
|
|
|
return None
|
|
|
|
# Global base template
|
|
DEFAULT_BASE_TEMPLATE = """#!c=0
|
|
>{NODE_NAME}
|
|
|
|
{NAVIGATION}
|
|
{PAGE_CONTENT}
|
|
<
|
|
-
|
|
`a`F666`[Served by rngit {VERSION}`:/page/index.mu] - {GEN_TIME}`f"""
|
|
|
|
# Front page template
|
|
DEFAULT_FRONT_TEMPLATE = """>Groups
|
|
|
|
{PAGE_CONTENT}"""
|
|
|
|
# Repositories page template
|
|
DEFAULT_GROUP_TEMPLATE = """{PAGE_CONTENT}""" |