]> git.smokeofanarchy.ru Git - space-station-14.git/commitdiff
Fixes and refactoring to discord changelog script (#33859)
authorimcb <irismessage@protonmail.com>
Tue, 11 Mar 2025 19:14:40 +0000 (19:14 +0000)
committerGitHub <noreply@github.com>
Tue, 11 Mar 2025 19:14:40 +0000 (20:14 +0100)
* Fixes and refactoring to discord changelog script

Upstreamed from https://github.com/impstation/imp-station-14/pull/1023

* Add some prints back in

Tools/actions_changelogs_since_last_run.py

index 89eca69b33a98a41388989a88370a16e79dd7c67..8261ad52391db1f4ea424fede4ae0ab777196c54 100755 (executable)
@@ -1,21 +1,22 @@
 #!/usr/bin/env python3
 
-#
-# Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
-# Automatically figures out the last run and changelog contents with the GitHub API.
-#
+"""
+Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
+
+Automatically figures out the last run and changelog contents with the GitHub API.
+"""
 
-import io
 import itertools
 import os
+from pathlib import Path
+from typing import Any, Iterable
+
 import requests
 import yaml
-from typing import Any, Iterable
 
-GITHUB_API_URL    = os.environ.get("GITHUB_API_URL", "https://api.github.com")
-GITHUB_REPOSITORY = os.environ["GITHUB_REPOSITORY"]
-GITHUB_RUN        = os.environ["GITHUB_RUN_ID"]
-GITHUB_TOKEN      = os.environ["GITHUB_TOKEN"]
+DEBUG = False
+DEBUG_CHANGELOG_FILE_OLD = Path("Resources/Changelog/Old.yml")
+GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
 
 # https://discord.com/developers/docs/resources/webhook
 DISCORD_SPLIT_LIMIT = 2000
@@ -23,39 +24,40 @@ DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL")
 
 CHANGELOG_FILE = "Resources/Changelog/Changelog.yml"
 
-TYPES_TO_EMOJI = {
-    "Fix":    "🐛",
-    "Add":    "🆕",
-    "Remove": "❌",
-    "Tweak":  "⚒️"
-}
+TYPES_TO_EMOJI = {"Fix": "🐛", "Add": "🆕", "Remove": "❌", "Tweak": "⚒️"}
 
 ChangelogEntry = dict[str, Any]
 
+
 def main():
     if not DISCORD_WEBHOOK_URL:
+        print("No discord webhook URL found, skipping discord send")
         return
 
-    session = requests.Session()
-    session.headers["Authorization"]        = f"Bearer {GITHUB_TOKEN}"
-    session.headers["Accept"]               = "Accept: application/vnd.github+json"
-    session.headers["X-GitHub-Api-Version"] = "2022-11-28"
+    if DEBUG:
+        # to debug this script locally, you can use
+        # a separate local file as the old changelog
+        last_changelog_stream = DEBUG_CHANGELOG_FILE_OLD.read_text()
+    else:
+        # when running this normally in a GitHub actions workflow,
+        # it will get the old changelog from the GitHub API
+        last_changelog_stream = get_last_changelog()
 
-    most_recent = get_most_recent_workflow(session)
-    last_sha = most_recent['head_commit']['id']
-    print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
-    last_changelog = yaml.safe_load(get_last_changelog(session, last_sha))
+    last_changelog = yaml.safe_load(last_changelog_stream)
     with open(CHANGELOG_FILE, "r") as f:
         cur_changelog = yaml.safe_load(f)
 
     diff = diff_changelog(last_changelog, cur_changelog)
-    send_to_discord(diff)
+    message_lines = changelog_entries_to_message_lines(diff)
+    send_message_lines(message_lines)
 
 
-def get_most_recent_workflow(sess: requests.Session) -> Any:
-    workflow_run = get_current_run(sess)
+def get_most_recent_workflow(
+    sess: requests.Session, github_repository: str, github_run: str
+) -> Any:
+    workflow_run = get_current_run(sess, github_repository, github_run)
     past_runs = get_past_runs(sess, workflow_run)
-    for run in past_runs['workflow_runs']:
+    for run in past_runs["workflow_runs"]:
         # First past successful run that isn't our current run.
         if run["id"] == workflow_run["id"]:
             continue
@@ -63,8 +65,12 @@ def get_most_recent_workflow(sess: requests.Session) -> Any:
         return run
 
 
-def get_current_run(sess: requests.Session) -> Any:
-    resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/actions/runs/{GITHUB_RUN}")
+def get_current_run(
+    sess: requests.Session, github_repository: str, github_run: str
+) -> Any:
+    resp = sess.get(
+        f"{GITHUB_API_URL}/repos/{github_repository}/actions/runs/{github_run}"
+    )
     resp.raise_for_status()
     return resp.json()
 
@@ -73,32 +79,55 @@ def get_past_runs(sess: requests.Session, current_run: Any) -> Any:
     """
     Get all successful workflow runs before our current one.
     """
-    params = {
-        "status": "success",
-        "created": f"<={current_run['created_at']}"
-    }
+    params = {"status": "success", "created": f"<={current_run['created_at']}"}
     resp = sess.get(f"{current_run['workflow_url']}/runs", params=params)
     resp.raise_for_status()
     return resp.json()
 
 
-def get_last_changelog(sess: requests.Session, sha: str) -> str:
+def get_last_changelog() -> str:
+    github_repository = os.environ["GITHUB_REPOSITORY"]
+    github_run = os.environ["GITHUB_RUN_ID"]
+    github_token = os.environ["GITHUB_TOKEN"]
+
+    session = requests.Session()
+    session.headers["Authorization"] = f"Bearer {github_token}"
+    session.headers["Accept"] = "Accept: application/vnd.github+json"
+    session.headers["X-GitHub-Api-Version"] = "2022-11-28"
+
+    most_recent = get_most_recent_workflow(session, github_repository, github_run)
+    last_sha = most_recent["head_commit"]["id"]
+    print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
+    last_changelog_stream = get_last_changelog_by_sha(
+        session, last_sha, github_repository
+    )
+
+    return last_changelog_stream
+
+
+def get_last_changelog_by_sha(
+    sess: requests.Session, sha: str, github_repository: str
+) -> str:
     """
     Use GitHub API to get the previous version of the changelog YAML (Actions builds are fetched with a shallow clone)
     """
     params = {
         "ref": sha,
     }
-    headers = {
-        "Accept": "application/vnd.github.raw"
-    }
+    headers = {"Accept": "application/vnd.github.raw"}
 
-    resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/contents/{CHANGELOG_FILE}", headers=headers, params=params)
+    resp = sess.get(
+        f"{GITHUB_API_URL}/repos/{github_repository}/contents/{CHANGELOG_FILE}",
+        headers=headers,
+        params=params,
+    )
     resp.raise_for_status()
     return resp.text
 
 
-def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[ChangelogEntry]:
+def diff_changelog(
+    old: dict[str, Any], cur: dict[str, Any]
+) -> Iterable[ChangelogEntry]:
     """
     Find all new entries not present in the previous publish.
     """
@@ -108,69 +137,75 @@ def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[Changel
 
 def get_discord_body(content: str):
     return {
-            "content": content,
-            # Do not allow any mentions.
-            "allowed_mentions": {
-                "parse": []
-            },
-            # SUPPRESS_EMBEDS
-            "flags": 1 << 2
-        }
+        "content": content,
+        # Do not allow any mentions.
+        "allowed_mentions": {"parse": []},
+        # SUPPRESS_EMBEDS
+        "flags": 1 << 2,
+    }
 
 
-def send_discord(content: str):
+def send_discord_webhook(lines: list[str]):
+    content = "".join(lines)
     body = get_discord_body(content)
 
     response = requests.post(DISCORD_WEBHOOK_URL, json=body)
     response.raise_for_status()
 
 
-def send_to_discord(entries: Iterable[ChangelogEntry]) -> None:
-    if not DISCORD_WEBHOOK_URL:
-        print(f"No discord webhook URL found, skipping discord send")
-        return
-
-    message_content = io.StringIO()
-    # We need to manually split messages to avoid discord's character limit
-    # With that being said this isn't entirely robust
-    # e.g. a sufficiently large CL breaks it, but that's a future problem
+def changelog_entries_to_message_lines(entries: Iterable[ChangelogEntry]) -> list[str]:
+    """Process structured changelog entries into a list of lines making up a formatted message."""
+    message_lines = []
 
-    for name, group in itertools.groupby(entries, lambda x: x["author"]):
-        # Need to split text to avoid discord character limit
-        group_content = io.StringIO()
-        group_content.write(f"**{name}** updated:\n")
+    for contributor_name, group in itertools.groupby(entries, lambda x: x["author"]):
+        message_lines.append(f"**{contributor_name}** updated:\n")
 
         for entry in group:
+            url = entry.get("url")
+            if url and not url.strip():
+                url = None
+
             for change in entry["changes"]:
-                emoji = TYPES_TO_EMOJI.get(change['type'], "❓")
-                message = change['message']
-                url = entry.get("url")
-                if url and url.strip():
-                    group_content.write(f"{emoji} - {message} [PR]({url}) \n")
+                emoji = TYPES_TO_EMOJI.get(change["type"], "❓")
+                message = change["message"]
+
+                # if a single line is longer than the limit, it needs to be truncated
+                if len(message) > DISCORD_SPLIT_LIMIT:
+                    message = message[: DISCORD_SPLIT_LIMIT - 100].rstrip() + " [...]"
+
+                if url is not None:
+                    line = f"{emoji} - {message} [PR]({url}) \n"
                 else:
-                    group_content.write(f"{emoji} - {message}\n")
-
-        group_text = group_content.getvalue()
-        message_text = message_content.getvalue()
-        message_length = len(message_text)
-        group_length = len(group_text)
-
-        # If adding the text would bring it over the group limit then send the message and start a new one
-        if message_length + group_length >= DISCORD_SPLIT_LIMIT:
-            print("Split changelog  and sending to discord")
-            send_discord(message_text)
-
-            # Reset the message
-            message_content = io.StringIO()
-
-        # Flush the group to the message
-        message_content.write(group_text)
-    
-    # Clean up anything remaining
-    message_text = message_content.getvalue()
-    if len(message_text) > 0:
+                    line = f"{emoji} - {message}\n"
+
+                message_lines.append(line)
+
+    return message_lines
+
+
+def send_message_lines(message_lines: list[str]):
+    """Join a list of message lines into chunks that are each below Discord's message length limit, and send them."""
+    chunk_lines = []
+    chunk_length = 0
+
+    for line in message_lines:
+        line_length = len(line)
+        new_chunk_length = chunk_length + line_length
+
+        if new_chunk_length > DISCORD_SPLIT_LIMIT:
+            print("Split changelog and sending to discord")
+            send_discord_webhook(chunk_lines)
+
+            new_chunk_length = line_length
+            chunk_lines.clear()
+
+        chunk_lines.append(line)
+        chunk_length = new_chunk_length
+
+    if chunk_lines:
         print("Sending final changelog to discord")
-        send_discord(message_text)
+        send_discord_webhook(chunk_lines)
 
 
-main()
+if __name__ == "__main__":
+    main()