Improved ref name validation in rngit

This commit is contained in:
Mark Qvist
2026-05-08 16:07:16 +02:00
parent d402ee33a2
commit e923ccbf1b
+74 -25
View File
@@ -1717,10 +1717,12 @@ class ReticulumGitNode():
# Check for_push permission if requested
for_push = data.get("for_push", False)
if for_push: access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_WRITE)
else: access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
read_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_READ)
write_access = self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_WRITE)
if for_push: access = write_access
else: access = read_access
if not access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
if not access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not allowed" if read_access else self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found"
else:
repository_path = self.groups[group_name]["repositories"][repository_name]["path"]
@@ -1788,7 +1790,8 @@ class ReticulumGitNode():
if not refs: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No refs specified"
try:
ref_names = [r["ref"] for r in refs]
ref_names = san_refs([r["ref"] for r in refs])
if not ref_names: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
RNS.log(f"Fetching refs {ref_names} for {group_name}/{repository_name}", RNS.LOG_DEBUG)
if not hasattr(link, "temporary_directories"): link.temporary_directories = []
@@ -1801,14 +1804,16 @@ class ReticulumGitNode():
execv = ["git", "bundle", "create", "--no-progress", bundle_path]
for r in refs:
execv.append(r["ref"])
# Per-ref have: The client already has this ancestor,
# so the server can exclude objects reachable from it.
if "have" in r and r["have"]:
have_sha = r["have"]
cat_result = subprocess.run(["git", "cat-file", "-t", have_sha], cwd=repository_path, capture_output=True, check=False)
if cat_result.returncode == 0: execv.append(f"^{have_sha}")
else: RNS.log(f"Client have-sha {have_sha} not found in repository, skipping", RNS.LOG_WARNING)
if not san_ref(r["ref"]): return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
else:
execv.append(r["ref"])
# Per-ref have: The client already has this ancestor,
# so the server can exclude objects reachable from it.
if "have" in r and r["have"]:
have_sha = r["have"]
cat_result = subprocess.run(["git", "cat-file", "-t", have_sha], cwd=repository_path, capture_output=True, check=False)
if cat_result.returncode == 0: execv.append(f"^{have_sha}")
else: RNS.log(f"Client have-sha {have_sha} not found in repository, skipping", RNS.LOG_WARNING)
# Global have list: SHAs of objects the client already has.
# Exclude objects reachable from these to produce thin bundles.
@@ -1847,8 +1852,8 @@ class ReticulumGitNode():
if not write_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found" if not read_access else self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
else:
repository_path = self.groups[group_name]["repositories"][repository_name]["path"]
local_ref = data.get("local_ref", "")
remote_ref = data.get("remote_ref", "")
local_ref = san_ref(data.get("local_ref", ""))
remote_ref = san_ref(data.get("remote_ref", ""))
force = data.get("force", False)
bundle_data = data.get("bundle", None)
operations = data.get("operations", None)
@@ -1888,13 +1893,16 @@ class ReticulumGitNode():
try:
for op in operations:
action = op.get("action", "")
ref = op.get("ref", "")
ref = san_ref(op.get("ref", ""))
sha = op.get("sha", "")
op_force = op.get("force", False)
if action != "update_ref": return self.RES_INVALID_REQ.to_bytes(1, "big") + f"Unknown operation: {action}".encode("utf-8")
if not ref.startswith("refs/"): return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid ref"
if not ref: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if not ref.startswith("refs/"): return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if not sha or len(sha) < 40: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid SHA"
try: bytes.fromhex(sha)
except: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid SHA"
# Verify the target object exists in the repository
cat_result = subprocess.run(["git", "cat-file", "-t", sha], cwd=repository_path, capture_output=True, check=False)
@@ -1935,9 +1943,10 @@ class ReticulumGitNode():
if not write_access: return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Not found" if not read_access else self.RES_DISALLOWED.to_bytes(1, "big") + b"Not allowed"
else:
repository_path = self.groups[group_name]["repositories"][repository_name]["path"]
ref_to_delete = data.get("ref", "")
ref_to_delete = san_ref(data.get("ref", ""))
if not ref_to_delete.startswith("refs/"): return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid ref"
if not ref_to_delete: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if not ref_to_delete.startswith("refs/"): return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
try:
RNS.log(f"Deleting ref {ref_to_delete} in {group_name}/{repository_name}", RNS.LOG_DEBUG)
execv = ["git", "update-ref", "-d", ref_to_delete]
@@ -2114,8 +2123,8 @@ class ReticulumGitNode():
return None
def _release_view(self, releases_path, data):
tag = data.get("tag")
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
tag = san_ref(data.get("tag"))
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid tag specified"
tag = os.path.basename(tag)
release_dir = os.path.join(releases_path, tag)
@@ -2137,12 +2146,12 @@ class ReticulumGitNode():
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
def _release_create_init(self, releases_path, repository_path, data, remote_identity):
tag = data.get("tag")
tag = san_ref(data.get("tag"))
commit_hash = data.get("hash")
notes = data.get("notes", "")
notes_format = data.get("notes_format", "markdown") # "markdown" or "micron"
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid tag specified"
tag = os.path.basename(tag)
if not tag or tag in [".", ".."]: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid tag name"
@@ -2186,7 +2195,7 @@ class ReticulumGitNode():
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _release_create_artifact(self, releases_path, data):
tag = data.get("tag")
tag = san_ref(data.get("tag"))
artifact_name = data.get("artifact_name")
artifact_data = data.get("artifact_data")
@@ -2222,7 +2231,7 @@ class ReticulumGitNode():
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _release_create_finalize(self, releases_path, data):
tag = data.get("tag")
tag = san_ref(data.get("tag"))
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
@@ -2250,7 +2259,7 @@ class ReticulumGitNode():
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _release_delete(self, releases_path, data):
tag = data.get("tag")
tag = san_ref(data.get("tag"))
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
@@ -3078,4 +3087,44 @@ COMMENT_TEMPLATE = "# Remove this line and enter your update. Save and exit when
CREATE_DOC_TEMPLATE = "# Remove this line and enter your document content. Save and exit when done, or save an empty document to abort abort."
DOC_PERMISSIONS_TEMPLATE ="# No permissions are currently defined for this workdoc. Add them below, and save and exit when you are done."
# Validate ref names according to https://git-scm.com/docs/git-check-ref-format
# This may be a bit overkill, since git validates names as well, but why not.
def san_ref(ref):
print(ref)
if ref.startswith("-"): return None
if ref.startswith("/"): return None
if ref.endswith("/"): return None
if ref.endswith("."): return None
if " " in ref: return None
if not "/" in ref: return None
if ".." in ref: return None
if "/." in ref: return None
if "//" in ref: return None
if "\\" in ref: return None
for comp in ref.split("/"):
if comp.endswith(".lock"): return None
if not all(ord(c) >= 40 for c in ref): return None # Any control character
if "\x7f" in ref: return None # ASCII DEL (177)
if "~" in ref: return None
if "^" in ref: return None
if ":" in ref: return None
if "?" in ref: return None
if "*" in ref: return None
if "[" in ref: return None
if "@{" in ref: return None
if "@" == ref: return None
return ref
def san_refs(refs):
if not type(refs) == list: return None
for ref in refs:
if not san_ref(ref): return None
return refs
if __name__ == "__main__": main()