Added rngit release management

This commit is contained in:
Mark Qvist
2026-05-04 20:14:39 +02:00
parent 3de16e085e
commit 15ec64e974
+606 -21
View File
@@ -32,12 +32,14 @@ import RNS
import os
import sys
import time
import shutil
import argparse
import threading
import subprocess
from threading import Lock
from tempfile import TemporaryDirectory
from tempfile import NamedTemporaryFile
from RNS._version import __version__
from RNS.Utilities.rngit import APP_NAME
@@ -139,6 +141,21 @@ def main():
class ReticulumGitClient():
PROTO_SPEC = "rns://"
PATH_LIST = "/git/list"
PATH_FETCH = "/git/fetch"
PATH_PUSH = "/git/push"
PATH_DELETE = "/git/delete"
PATH_RELEASE = "/mgmt/release"
RES_OK = 0x00
RES_DISALLOWED = 0x01
RES_INVALID_REQ = 0x02
RES_NOT_FOUND = 0x03
RES_REMOTE_FAIL = 0xFF
IDX_REPOSITORY = 0x00
IDX_RESULT_CODE = 0x01
PATH_TIMEOUT = 15
LINK_TIMEOUT = 15
@@ -151,6 +168,12 @@ class ReticulumGitClient():
self.link_timeout = self.LINK_TIMEOUT
self._should_run = True
self.link_ready = False
self.link_failed = False
self.request_event = threading.Event()
self.request_response = None
self.response_metadata = None
if not ReticulumGitNode._ensure_git(): RNS.log("The \"git\" command is not available. Aborting server startup.", RNS.LOG_ERROR)
else:
if configdir != None: self.configdir = configdir
@@ -165,11 +188,11 @@ class ReticulumGitClient():
if not os.path.isfile(self.identitypath):
identity = RNS.Identity()
identity.to_file(self.identitypath)
print(f"Identity generated and persisted to {self.identitypath}")
RNS.log(f"Identity generated and persisted to {self.identitypath}", RNS.LOG_DEBUG)
else:
identity = RNS.Identity.from_file(self.identitypath)
print(f"Client identity loaded from {self.identitypath}")
RNS.log(f"Client identity loaded from {self.identitypath}", RNS.LOG_DEBUG)
if not identity: self.abort("Could not initialize client identity")
else: self.identity = identity
@@ -188,7 +211,7 @@ class ReticulumGitClient():
def connect_remote(self, remote):
destination_hash, group, repo = self.parse_remote_url(remote)
print(f"Requesting path...", end="")
print(f"Requesting path... ", end="")
if not RNS.Transport.await_path(destination_hash, timeout=self.path_timeout):
print(f"\n", end="")
self.abort(f"Could not resolve path to {RNS.prettyhexrep(destination_hash)}")
@@ -198,48 +221,335 @@ class ReticulumGitClient():
self.remote_identity = RNS.Identity.recall(destination_hash)
if not self.remote_identity: self.abort("Could not recall remote identity")
print(f"\rEstablishing link...")
print(f"\rEstablishing link... ", end="")
self.destination = RNS.Destination(self.remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "repositories")
self.link = RNS.Link(self.destination)
self.link.set_link_established_callback(self.link_established)
self.link.set_link_closed_callback(self.link_closed)
def link_established(self, link):
print(f"\rLink established ", end="")
link.identify(self.identity)
self.link_ready = True
def link_closed(self, link):
print("The link to the remote was closed")
if not self.link_ready: self.link_failed = True
################################
# Synchronous Request Wrappers #
################################
def _response_ready(self, request_receipt):
self.request_response = request_receipt.response
self.response_metadata = request_receipt.metadata
self.request_event.set()
def _response_failed(self, request_receipt=None):
self.request_response = None
self.request_event.set()
def send_request(self, path, data, timeout=120):
if not self.link_ready: self.abort("Link not ready at request time")
self.request_event.clear()
self.request_response = None
self.response_metadata = None
RNS.log(f"Sending request: {path}", RNS.LOG_DEBUG)
request_receipt = self.link.request(path, data, response_callback=self._response_ready, failed_callback=self._response_failed, timeout=timeout)
self.request_event.wait(timeout=timeout)
if self.request_response is None: self.abort("Request failed or timed out")
RNS.log(f"Got response for: {path}", RNS.LOG_DEBUG)
return self.request_response, self.response_metadata
def _edit_release_notes(self, tag="this release"):
editor = os.environ.get("EDITOR", "")
if not editor:
# Try common fallbacks
for fallback in ["nano", "vim", "vi"]:
try:
subprocess.run(["which", fallback], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
editor = fallback
break
except subprocess.CalledProcessError:
continue
if not editor:
print("No editor found. Please set $EDITOR environment variable.")
return None
template = RELEASE_NOTES_TEMPLATE.replace("{TAG}", tag)
try:
with NamedTemporaryFile(mode="w+", suffix=".md", delete=False) as tmp:
tmp_path = tmp.name
tmp.write(template)
result = subprocess.run([editor, tmp_path])
if result.returncode != 0:
print(f"Editor exited with error code {result.returncode}")
os.unlink(tmp_path)
return None
with open(tmp_path, "r") as f: content = f.read()
os.unlink(tmp_path)
lines = [line for line in content.split("\n") if not line.strip().startswith("#")]
notes = "\n".join(lines).strip()
if not notes: return None
return notes
except Exception as e:
RNS.log(f"Error getting release notes: {e}", RNS.LOG_ERROR)
return None
def list_releases(self, remote=None):
if not remote: print(f"No remote specified"); exit(1)
self.connect_remote(remote)
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(0.1)
timeout -= 1
if not self.link_ready: self.abort("Link establishment failed")
try:
destination_hash, group, repo = self.parse_remote_url(remote)
repo_path = f"{group}/{repo}"
request_data = {self.IDX_REPOSITORY: repo_path, "operation": "list"}
response, metadata = self.send_request(self.PATH_RELEASE, request_data, timeout=30)
print("\r \r", end="")
# TODO: Implement release listing
pass
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Server error: {error_msg}")
if len(response) > 1: releases = mp.unpackb(response[1:])
else: releases = []
if not releases: print("No releases for this repository")
else:
print(f"{'Tag':<10} {'Status':<10} {'Created':<17} {'Objs':<5} Notes")
print("-" * 80)
for rel in releases:
tag = rel.get("tag", "unknown")[:10]
status = rel.get("status", "unknown")[:9]
created_ts = rel.get("created", 0)
created = time.strftime("%Y-%m-%d %H:%M", time.localtime(created_ts)) if created_ts else "unknown"
artifacts = str(rel.get("artifacts", 0))
preview = rel.get("preview", "")[:34]
print(f"{tag:<10} {status:<10} {created:<17} {artifacts:<5} {preview}")
print()
except Exception as e: self.abort(f"Error listing releases: {e}")
finally:
if self.link: self.link.teardown()
def view_release(self, remote=None, target=None):
if not remote: print(f"No remote specified"); exit(1)
if not target: print(f"No target specified"); exit(1)
self.connect_remote(remote)
# TODO: Implement release listing
pass
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(0.5)
timeout -= 1
if not self.link_ready: self.abort("Link establishment failed")
try:
destination_hash, group, repo = self.parse_remote_url(remote)
repo_path = f"{group}/{repo}"
request_data = {self.IDX_REPOSITORY: repo_path, "operation": "view", "tag": target}
response, metadata = self.send_request(self.PATH_RELEASE, request_data, timeout=30)
print("\r \r", end="")
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
if len(response) <= 1: self.abort("Empty response from remote")
release = mp.unpackb(response[1:])
print(f"Release : {release.get('tag', target)}")
print(f"Status : {release.get('status', 'unknown')}")
created_ts = release.get('created', 0)
if created_ts: print(f"Created : {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_ts))}")
print(f"Thanks : {release.get('thanks', 0)}")
notes = release.get('notes', '')
if notes:
print("\nRelease Notes")
print("=============\n")
print(notes)
artifacts = release.get('artifacts', [])
if artifacts:
artifacts_str = f"Artifacts ({len(artifacts)})"
print(f"\n{artifacts_str}")
print("="*len(artifacts_str))
for a in artifacts:
size = a.get('size', 0)
size_str = RNS.prettysize(size) if size else "0 B"
print(f" - {a.get('name', 'unknown')} ({size_str})")
print()
except Exception as e: self.abort(f"Error viewing release: {e}")
finally:
if self.link: self.link.teardown()
def create_release(self, remote=None, target=None):
if not remote: print(f"No remote specified"); exit(1)
if not target: print(f"No target specified"); exit(1)
self.connect_remote(remote)
# TODO: Implement release listing
pass
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(0.1)
timeout -= 1
if not self.link_ready: self.abort("Failed to establish link")
print("\r \r", end="")
try:
destination_hash, group, repo = self.parse_remote_url(remote)
repo_path = f"{group}/{repo}"
# Parse target - can be:
# 1. Just a tag name: "v1.0.0"
# 3. Tag with path to artifacts directory: "v1.0.0:/path/to/artifacts"
parts = target.split(":")
if len(parts) < 2: self.abort("Invalid release specification\nDid you provide both a tag and artifacts path such as \"1.0.0:./dist\"?")
tag = parts[0]
artifacts_path = os.path.expanduser(parts[1])
commit_hash = ""
if not os.path.isdir(artifacts_path): self.abort("Specified artifacts directory does not exist")
artifacts = [f for f in os.listdir(artifacts_path) if os.path.isfile(os.path.join(artifacts_path, f))]
if not artifacts: self.abort("No files found in specified artifact directory")
# Get release notes
print(f"Creating release {tag}")
notes = self._edit_release_notes(tag=tag)
if notes is None: print("Release creation cancelled"); return
# Step 1: Initialize release
print("Initializing release...")
request_data = { self.IDX_REPOSITORY: repo_path,
"operation": "create", "step": "init",
"tag": tag, "hash": commit_hash,
"notes": notes, "notes_format": "markdown" }
response, metadata = self.send_request(self.PATH_RELEASE, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote during release init")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Server error during init: {error_msg}")
print("Release initialized")
# Step 2: Upload artifacts
ms = "" if len(artifacts) == 1 else "s"
print(f"\nSending {len(artifacts)} artifact{ms}...")
for artifact in artifacts:
artifact_path = os.path.join(artifacts_path, artifact)
with open(artifact_path, "rb") as f: artifact_data = f.read()
request_data = { self.IDX_REPOSITORY: repo_path,
"operation": "create", "step": "artifact",
"tag": tag, "artifact_name": artifact,
"artifact_data": artifact_data }
response, metadata = self.send_request(self.PATH_RELEASE, request_data, timeout=300)
if not response or not isinstance(response, bytes) or response[0] != 0:
error_msg = response[1:].decode("utf-8", errors="ignore") if response else "Unknown error"
print(f" Failed to send {artifact}: {error_msg}")
else: print(f" {artifact} ({RNS.prettysize(len(artifact_data))}) transferred")
# Step 3: Finalize release
print("\nFinalizing release...")
request_data = { self.IDX_REPOSITORY: repo_path,
"operation": "create", "step": "finalize", "tag": tag }
response, metadata = self.send_request(self.PATH_RELEASE, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote during finalize")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Server error during finalize: {error_msg}")
print(f"Release {tag} published")
except Exception as e:
self.abort(f"Error creating release: {e}")
finally:
if self.link: self.link.teardown()
def delete_release(self, remote=None, target=None):
if not remote: print(f"No remote specified"); exit(1)
if not target: print(f"No target specified"); exit(1)
self.connect_remote(remote)
# TODO: Implement release listing
pass
timeout = self.link_timeout
while not self.link_ready and not self.link_failed and timeout > 0:
time.sleep(0.5)
timeout -= 1
if not self.link_ready: self.abort("Failed to establish link")
print("\r \r", end="")
try:
destination_hash, group, repo = self.parse_remote_url(remote)
repo_path = f"{group}/{repo}"
print(f"Are you sure you want to delete release {target}? [y/N]: ", end="")
try: confirm = input().strip().lower()
except EOFError: confirm = "n"
if confirm != "y":
print("Deletion cancelled")
return
request_data = { self.IDX_REPOSITORY: repo_path,
"operation": "delete", "tag": target }
response, metadata = self.send_request(self.PATH_RELEASE, request_data, timeout=30)
if not response or not isinstance(response, bytes): self.abort("No response from remote")
status_byte = response[0]
if status_byte != 0:
error_msg = response[1:].decode("utf-8", errors="ignore")
self.abort(f"Remote error: {error_msg}")
print(f"Release {target} deleted")
except Exception as e: self.abort(f"Error deleting release: {e}")
finally:
if self.link: self.link.teardown()
class ReticulumGitNode():
JOBS_INTERVAL = 5
@@ -533,7 +843,7 @@ class ReticulumGitNode():
def load_repository_group(self, group_name, group_path):
# TODO: Implement group.allowed file
if not group_name in self.groups: self.groups[group_name] = { "path": group_path, "repositories": {}, "read": [], "write": [], "create": [], "stats": [] }
if not group_name in self.groups: self.groups[group_name] = { "path": group_path, "repositories": {}, "read": [], "write": [], "create": [], "stats": [], "release": [] }
if group_name in self.groups and self.groups[group_name]["path"] != group_path:
RNS.log(f"Repository group path did not match existing entry while loading {group_name}, aborting load", RNS.LOG_ERROR)
return
@@ -958,19 +1268,289 @@ class ReticulumGitNode():
releases_path = f"{repository_path}.releases"
try:
# TODO: Implement release management handlers
if operation == "list" and read_access: pass
elif operation == "view" and read_access: pass
elif operation == "create" and release_access: pass
elif operation == "delete" and release_access: pass
if operation == "list" and read_access: return self._release_list(releases_path)
elif operation == "view" and read_access: return self._release_view(releases_path, data)
elif operation == "create" and release_access: return self._release_create(releases_path, repository_path, data, remote_identity)
elif operation == "delete" and release_access: return self._release_delete(releases_path, data)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
return b"\x00"
except Exception as e:
RNS.log(f"Error while handling release request for {group_name}/{repository_name}: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def releases_list_data(self, releases_path):
try:
releases = []
if not os.path.isdir(releases_path): return releases
for entry in os.listdir(releases_path):
release_dir = os.path.join(releases_path, entry)
if not os.path.isdir(release_dir): continue
meta_path = os.path.join(release_dir, "META")
if not os.path.isfile(meta_path): continue
try:
meta = ConfigObj(meta_path)
release_info = { "tag": meta.get("tag", entry),
"hash": meta.get("hash", ""),
"created": meta.as_int("created") if "created" in meta else 0,
"status": meta.get("status", "unknown"),
"created_by": meta.get("created_by", "") }
notes_preview = ""
for notes_file in ["RELEASE.md", "RELEASE.mu"]:
notes_path = os.path.join(release_dir, notes_file)
if os.path.isfile(notes_path):
try:
with open(notes_path, "r", encoding="utf-8") as f:
first_line = f.readline().strip()
if first_line.startswith("#"): first_line = first_line.lstrip("#").strip()
notes_preview = first_line[:256]
except Exception: pass
break
release_info["preview"] = notes_preview
artifacts_dir = os.path.join(release_dir, "artifacts")
if os.path.isdir(artifacts_dir):
release_info["artifacts"] = len([f for f in os.listdir(artifacts_dir) if os.path.isfile(os.path.join(artifacts_dir, f))])
else: release_info["artifacts"] = 0
releases.append(release_info)
except Exception as e:
RNS.log(f"Error reading release metadata for {entry}: {e}", RNS.LOG_DEBUG)
continue
releases.sort(key=lambda x: x.get("created", 0), reverse=True)
return releases
except Exception as e:
RNS.log(f"Error listing releases for {releases_path}: {e}", RNS.LOG_ERROR)
return None
def _release_list(self, releases_path):
if not os.path.isdir(releases_path): return b"\x00" + mp.packb([])
releases = self.releases_list_data(releases_path)
if releases == None: return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error listing releases"
return b"\x00" + mp.packb(releases)
def release_data(self, release_dir, tag):
try:
meta_path = os.path.join(release_dir, "META")
if not os.path.isfile(meta_path):
RNS.log(f"Release metadata missing for {release_dir}/{tag}", RNS.LOG_ERROR)
return None
meta = ConfigObj(meta_path)
release_info = { "tag": meta.get("tag", tag),
"hash": meta.get("hash", ""),
"created": meta.as_int("created") if "created" in meta else 0,
"status": meta.get("status", "unknown"),
"created_by": meta.get("created_by", "") }
notes_content = ""
notes_format = "text"
for notes_file, fmt in [("RELEASE.md", "markdown"), ("RELEASE.mu", "micron")]:
notes_path = os.path.join(release_dir, notes_file)
if os.path.isfile(notes_path):
try:
with open(notes_path, "r", encoding="utf-8") as f: notes_content = f.read()
notes_format = fmt
except Exception as e: RNS.log(f"Error reading release notes: {e}", RNS.LOG_DEBUG)
break
release_info["notes"] = notes_content
release_info["notes_format"] = notes_format
artifacts = []
artifacts_dir = os.path.join(release_dir, "artifacts")
if os.path.isdir(artifacts_dir):
for artifact in os.listdir(artifacts_dir):
artifact_path = os.path.join(artifacts_dir, artifact)
if os.path.isfile(artifact_path):
artifacts.append({ "name": artifact, "size": os.path.getsize(artifact_path)})
release_info["artifacts"] = artifacts
thanks_path = os.path.join(release_dir, "THANKS")
thanks_count = 0
if os.path.isfile(thanks_path):
try:
with open(thanks_path, "rb") as f:
thanks_data = mp.unpackb(f.read())
thanks_count = thanks_data.get("count", 0)
except Exception: pass
release_info["thanks"] = thanks_count
return release_info
except Exception as e:
RNS.log(f"Error while getting release data for {release_dir}/{tag}: {e}", RNS.LOG_ERROR)
return None
def _release_view(self, releases_path, data):
tag = data.get("tag")
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
tag = os.path.basename(tag)
release_dir = os.path.join(releases_path, tag)
if not os.path.isdir(release_dir): return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Release not found"
release_info = self.release_data(release_dir, tag)
if not release_info: return self.RES_REMOTE_FAIL.to_bytes(1, "big") + b"Error getting release data"
return b"\x00" + mp.packb(release_info)
def _release_create(self, releases_path, repository_path, data, remote_identity):
step = data.get("step")
if not step: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
if step == "init": return self._release_create_init(releases_path, repository_path, data, remote_identity)
elif step == "artifact": return self._release_create_artifact(releases_path, data)
elif step == "finalize": return self._release_create_finalize(releases_path, data)
else: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid request"
def _release_create_init(self, releases_path, repository_path, data, remote_identity):
tag = data.get("tag")
commit_hash = data.get("hash")
notes = data.get("notes", "")
notes_format = data.get("notes_format", "markdown") # "markdown" or "micron"
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
tag = os.path.basename(tag)
if not tag or tag in [".", ".."]: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Invalid tag name"
try:
tag_check = subprocess.run(["git", "rev-parse", "--verify", f"refs/tags/{tag}"],
cwd=repository_path, capture_output=True, check=False)
if tag_check.returncode != 0: return self.RES_INVALID_REQ.to_bytes(1, "big") + f"Tag '{tag}' does not exist in repository".encode("utf-8")
if not os.path.isdir(releases_path): os.makedirs(releases_path, mode=0o755)
release_dir = os.path.join(releases_path, tag)
if os.path.isdir(release_dir): return self.RES_DISALLOWED.to_bytes(1, "big") + b"Release already exists"
os.makedirs(release_dir, mode=0o755)
os.makedirs(os.path.join(release_dir, "artifacts"), mode=0o755)
meta = ConfigObj()
meta.filename = os.path.join(release_dir, "META")
meta["tag"] = tag
if commit_hash: meta["hash"] = commit_hash
meta["created"] = int(time.time())
meta["status"] = "draft"
meta["created_by"] = RNS.hexrep(remote_identity.hash, delimit=False)
meta.write()
if notes:
notes_filename = "RELEASE.mu" if notes_format == "micron" else "RELEASE.md"
notes_path = os.path.join(release_dir, notes_filename)
with open(notes_path, "w", encoding="utf-8") as f: f.write(notes)
thanks_path = os.path.join(release_dir, "THANKS")
with open(thanks_path, "wb") as f: f.write(mp.packb({"count": 0}))
RNS.log(f"Created release {tag} in draft status", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error creating release: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _release_create_artifact(self, releases_path, data):
tag = data.get("tag")
artifact_name = data.get("artifact_name")
artifact_data = data.get("artifact_data")
if not tag or not artifact_name: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"Missing tag or artifact name"
if artifact_data is None: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No artifact data"
tag = os.path.basename(tag)
artifact_name = os.path.basename(artifact_name)
try:
release_dir = os.path.join(releases_path, tag)
if not os.path.isdir(release_dir): return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Release not found"
meta_path = os.path.join(release_dir, "META")
meta = ConfigObj(meta_path)
if meta.get("status") != "draft": return self.RES_DISALLOWED.to_bytes(1, "big") + b"Release was finalized and is not writable"
artifacts_dir = os.path.join(release_dir, "artifacts")
artifact_path = os.path.join(artifacts_dir, artifact_name)
if not os.path.isdir(artifacts_dir): os.makedirs(artifacts_dir, mode=0o755)
with open(artifact_path, "wb") as f:
if isinstance(artifact_data, str): f.write(artifact_data.encode("utf-8"))
else: f.write(artifact_data)
RNS.log(f"Added artifact {artifact_name} to release {tag}", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error adding artifact: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _release_create_finalize(self, releases_path, data):
tag = data.get("tag")
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
tag = os.path.basename(tag)
try:
release_dir = os.path.join(releases_path, tag)
if not os.path.isdir(release_dir): return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Release not found"
meta_path = os.path.join(release_dir, "META")
meta = ConfigObj(meta_path)
if meta.get("status") != "draft": return self.RES_DISALLOWED.to_bytes(1, "big") + b"Release was finalized and is not writable"
meta["status"] = "published"
meta["published_at"] = int(time.time())
meta.write()
RNS.log(f"Finalized release {tag}", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error finalizing release: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def _release_delete(self, releases_path, data):
tag = data.get("tag")
if not tag: return self.RES_INVALID_REQ.to_bytes(1, "big") + b"No tag specified"
tag = os.path.basename(tag)
release_dir = os.path.join(releases_path, tag)
if not os.path.isdir(release_dir): return self.RES_NOT_FOUND.to_bytes(1, "big") + b"Release not found"
try:
shutil.rmtree(release_dir)
RNS.log(f"Deleted release {tag}", RNS.LOG_DEBUG)
return b"\x00"
except Exception as e:
RNS.log(f"Error deleting release: {e}", RNS.LOG_ERROR)
return self.RES_REMOTE_FAIL.to_bytes(1, "big") + str(e).encode("utf-8")
def repository_stats(self, remote_identity, group_name, repository_name, lookback_days=14):
if not self.resolve_permission(remote_identity, group_name, repository_name, self.PERM_STATS): return None
else:
@@ -1270,4 +1850,9 @@ loglevel = 4
'''.splitlines()
RELEASE_NOTES_TEMPLATE = """# Enter release notes for {TAG}.
# Lines starting with '#' will be ignored.
# Save and exit the editor when done, or exit without saving to abort.
"""
if __name__ == "__main__": main()