#!/usr/bin/env python3
-#
-# Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
-# Automatically figures out the last run and changelog contents with the GitHub API.
-#
+"""
+Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
+
+Automatically figures out the last run and changelog contents with the GitHub API.
+"""
-import io
import itertools
import os
+from pathlib import Path
+from typing import Any, Iterable
+
import requests
import yaml
-from typing import Any, Iterable
-GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
-GITHUB_REPOSITORY = os.environ["GITHUB_REPOSITORY"]
-GITHUB_RUN = os.environ["GITHUB_RUN_ID"]
-GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
+DEBUG = False
+DEBUG_CHANGELOG_FILE_OLD = Path("Resources/Changelog/Old.yml")
+GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
# https://discord.com/developers/docs/resources/webhook
DISCORD_SPLIT_LIMIT = 2000
CHANGELOG_FILE = "Resources/Changelog/Changelog.yml"
-TYPES_TO_EMOJI = {
- "Fix": "🐛",
- "Add": "🆕",
- "Remove": "❌",
- "Tweak": "⚒️"
-}
+TYPES_TO_EMOJI = {"Fix": "🐛", "Add": "🆕", "Remove": "❌", "Tweak": "⚒️"}
ChangelogEntry = dict[str, Any]
+
def main():
if not DISCORD_WEBHOOK_URL:
+ print("No discord webhook URL found, skipping discord send")
return
- session = requests.Session()
- session.headers["Authorization"] = f"Bearer {GITHUB_TOKEN}"
- session.headers["Accept"] = "Accept: application/vnd.github+json"
- session.headers["X-GitHub-Api-Version"] = "2022-11-28"
+ if DEBUG:
+ # to debug this script locally, you can use
+ # a separate local file as the old changelog
+ last_changelog_stream = DEBUG_CHANGELOG_FILE_OLD.read_text()
+ else:
+ # when running this normally in a GitHub actions workflow,
+ # it will get the old changelog from the GitHub API
+ last_changelog_stream = get_last_changelog()
- most_recent = get_most_recent_workflow(session)
- last_sha = most_recent['head_commit']['id']
- print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
- last_changelog = yaml.safe_load(get_last_changelog(session, last_sha))
+ last_changelog = yaml.safe_load(last_changelog_stream)
with open(CHANGELOG_FILE, "r") as f:
cur_changelog = yaml.safe_load(f)
diff = diff_changelog(last_changelog, cur_changelog)
- send_to_discord(diff)
+ message_lines = changelog_entries_to_message_lines(diff)
+ send_message_lines(message_lines)
-def get_most_recent_workflow(sess: requests.Session) -> Any:
- workflow_run = get_current_run(sess)
+def get_most_recent_workflow(
+ sess: requests.Session, github_repository: str, github_run: str
+) -> Any:
+ workflow_run = get_current_run(sess, github_repository, github_run)
past_runs = get_past_runs(sess, workflow_run)
- for run in past_runs['workflow_runs']:
+ for run in past_runs["workflow_runs"]:
# First past successful run that isn't our current run.
if run["id"] == workflow_run["id"]:
continue
return run
-def get_current_run(sess: requests.Session) -> Any:
- resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/actions/runs/{GITHUB_RUN}")
+def get_current_run(
+ sess: requests.Session, github_repository: str, github_run: str
+) -> Any:
+ resp = sess.get(
+ f"{GITHUB_API_URL}/repos/{github_repository}/actions/runs/{github_run}"
+ )
resp.raise_for_status()
return resp.json()
"""
Get all successful workflow runs before our current one.
"""
- params = {
- "status": "success",
- "created": f"<={current_run['created_at']}"
- }
+ params = {"status": "success", "created": f"<={current_run['created_at']}"}
resp = sess.get(f"{current_run['workflow_url']}/runs", params=params)
resp.raise_for_status()
return resp.json()
-def get_last_changelog(sess: requests.Session, sha: str) -> str:
+def get_last_changelog() -> str:
+ github_repository = os.environ["GITHUB_REPOSITORY"]
+ github_run = os.environ["GITHUB_RUN_ID"]
+ github_token = os.environ["GITHUB_TOKEN"]
+
+ session = requests.Session()
+ session.headers["Authorization"] = f"Bearer {github_token}"
+ session.headers["Accept"] = "Accept: application/vnd.github+json"
+ session.headers["X-GitHub-Api-Version"] = "2022-11-28"
+
+ most_recent = get_most_recent_workflow(session, github_repository, github_run)
+ last_sha = most_recent["head_commit"]["id"]
+ print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
+ last_changelog_stream = get_last_changelog_by_sha(
+ session, last_sha, github_repository
+ )
+
+ return last_changelog_stream
+
+
+def get_last_changelog_by_sha(
+ sess: requests.Session, sha: str, github_repository: str
+) -> str:
"""
Use GitHub API to get the previous version of the changelog YAML (Actions builds are fetched with a shallow clone)
"""
params = {
"ref": sha,
}
- headers = {
- "Accept": "application/vnd.github.raw"
- }
+ headers = {"Accept": "application/vnd.github.raw"}
- resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/contents/{CHANGELOG_FILE}", headers=headers, params=params)
+ resp = sess.get(
+ f"{GITHUB_API_URL}/repos/{github_repository}/contents/{CHANGELOG_FILE}",
+ headers=headers,
+ params=params,
+ )
resp.raise_for_status()
return resp.text
-def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[ChangelogEntry]:
+def diff_changelog(
+ old: dict[str, Any], cur: dict[str, Any]
+) -> Iterable[ChangelogEntry]:
"""
Find all new entries not present in the previous publish.
"""
def get_discord_body(content: str):
return {
- "content": content,
- # Do not allow any mentions.
- "allowed_mentions": {
- "parse": []
- },
- # SUPPRESS_EMBEDS
- "flags": 1 << 2
- }
+ "content": content,
+ # Do not allow any mentions.
+ "allowed_mentions": {"parse": []},
+ # SUPPRESS_EMBEDS
+ "flags": 1 << 2,
+ }
-def send_discord(content: str):
+def send_discord_webhook(lines: list[str]):
+ content = "".join(lines)
body = get_discord_body(content)
response = requests.post(DISCORD_WEBHOOK_URL, json=body)
response.raise_for_status()
-def send_to_discord(entries: Iterable[ChangelogEntry]) -> None:
- if not DISCORD_WEBHOOK_URL:
- print(f"No discord webhook URL found, skipping discord send")
- return
-
- message_content = io.StringIO()
- # We need to manually split messages to avoid discord's character limit
- # With that being said this isn't entirely robust
- # e.g. a sufficiently large CL breaks it, but that's a future problem
+def changelog_entries_to_message_lines(entries: Iterable[ChangelogEntry]) -> list[str]:
+ """Process structured changelog entries into a list of lines making up a formatted message."""
+ message_lines = []
- for name, group in itertools.groupby(entries, lambda x: x["author"]):
- # Need to split text to avoid discord character limit
- group_content = io.StringIO()
- group_content.write(f"**{name}** updated:\n")
+ for contributor_name, group in itertools.groupby(entries, lambda x: x["author"]):
+ message_lines.append(f"**{contributor_name}** updated:\n")
for entry in group:
+ url = entry.get("url")
+ if url and not url.strip():
+ url = None
+
for change in entry["changes"]:
- emoji = TYPES_TO_EMOJI.get(change['type'], "❓")
- message = change['message']
- url = entry.get("url")
- if url and url.strip():
- group_content.write(f"{emoji} - {message} [PR]({url}) \n")
+ emoji = TYPES_TO_EMOJI.get(change["type"], "❓")
+ message = change["message"]
+
+ # if a single line is longer than the limit, it needs to be truncated
+ if len(message) > DISCORD_SPLIT_LIMIT:
+ message = message[: DISCORD_SPLIT_LIMIT - 100].rstrip() + " [...]"
+
+ if url is not None:
+ line = f"{emoji} - {message} [PR]({url}) \n"
else:
- group_content.write(f"{emoji} - {message}\n")
-
- group_text = group_content.getvalue()
- message_text = message_content.getvalue()
- message_length = len(message_text)
- group_length = len(group_text)
-
- # If adding the text would bring it over the group limit then send the message and start a new one
- if message_length + group_length >= DISCORD_SPLIT_LIMIT:
- print("Split changelog and sending to discord")
- send_discord(message_text)
-
- # Reset the message
- message_content = io.StringIO()
-
- # Flush the group to the message
- message_content.write(group_text)
-
- # Clean up anything remaining
- message_text = message_content.getvalue()
- if len(message_text) > 0:
+ line = f"{emoji} - {message}\n"
+
+ message_lines.append(line)
+
+ return message_lines
+
+
+def send_message_lines(message_lines: list[str]):
+ """Join a list of message lines into chunks that are each below Discord's message length limit, and send them."""
+ chunk_lines = []
+ chunk_length = 0
+
+ for line in message_lines:
+ line_length = len(line)
+ new_chunk_length = chunk_length + line_length
+
+ if new_chunk_length > DISCORD_SPLIT_LIMIT:
+ print("Split changelog and sending to discord")
+ send_discord_webhook(chunk_lines)
+
+ new_chunk_length = line_length
+ chunk_lines.clear()
+
+ chunk_lines.append(line)
+ chunk_length = new_chunk_length
+
+ if chunk_lines:
print("Sending final changelog to discord")
- send_discord(message_text)
+ send_discord_webhook(chunk_lines)
-main()
+if __name__ == "__main__":
+ main()