Jump to content

User:IngenuityBot/backlogdrive.py

From Wikipedia, the free encyclopedia
from datetime import datetime, timezone
import requests, json, re, pywikibot
import time


def query_url(query):
    url = "https://wiki.riteme.site/w/api.php?"
    params = []
    for item in query:
        params.append(f"{item}={query[item]}")
    params = "&".join(params)
    return url + params


def get_users(participants_page):
    data = requests.get(query_url({
        "action": "query",
        "prop": "revisions",
        "rvprop": "content",
        "titles": participants_page,
        "rvlimit": 1,
        "format": "json"
    })).json()["query"]["pages"]

    for key in data:
        data = data[key]["revisions"][0]["*"]

    data = re.findall(r"{{user\|(.+?)}}", data, flags=re.IGNORECASE)
    return data[1:]


def get_metadata():
    return json.loads(open("metadata.json", "r").read())


def get_review_reasons(summary):
    reasons = []
    for reason in re.findall(r"(\w+) - ([ \w\[\]:|]+) (?:and|\()", summary):
        reasons.append(reason)
    return reasons


def get_time(timestamp, date_format="%Y-%m-%dT%H:%M:%SZ"):
    return datetime.strptime(timestamp, date_format).replace(tzinfo=timezone.utc).timestamp()


def current_time():
    return (
        datetime.strftime(datetime.now(timezone.utc), format="%Y%m%d%H%M%S"),
        datetime.strftime(datetime.now(timezone.utc), format="%H:%M:%S, %d %B")
    )


def get_reviews(user, since):
    reviews = []
    start = "now"
    while True:
        edits = requests.get(query_url({
            "action": "query",
            "list": "usercontribs",
            "uclimit": 500,
            "format": "json",
            "ucuser": user,
            "ucstart": start,
            "ucprop": "ids|timestamp|comment|tags|user|sizes|title"
        })).json()["query"]["usercontribs"]

        for edit in edits:
            if get_time(edit["timestamp"]) < get_time(since):
                return reviews

            review = {
                "timestamp": edit["timestamp"],
                "revid": edit["revid"],
                "page": edit["title"],
                "type": "",
                "age": 0
            }

            edit["comment"] = edit["comment"] if "comment" in edit else ""

            # check each edit to see if it is a review
            if re.match(r"Declining(.+?)\(\[\[WP:AFCH\|AFCH\]\] 0\.9\.1\)", edit["comment"]):
                review["type"] = "decline"
                review["reasons"] = get_review_reasons(edit["comment"])
            elif re.match(r"Rejecting(.+?)\(\[\[WP:AFCH\|AFCH\]\] 0\.9\.1\)", edit["comment"]):
                review["type"] = "reject"
                review["reasons"] = get_review_reasons(edit["comment"])
            elif re.match(r"Cleaning up accepted.+?\(\[\[WP:AFCH\|AFCH\]\] 0\.9\.1\)", edit["comment"]):
                review["type"] = "accept"

            if review["type"] == "":
                continue
            
            # get wikitext of the page at the time of review
            wikitext = requests.get(query_url({
                "action": "query",
                "prop": "revisions",
                "rvprop": "content",
                "rvlimit": 2,
                "rvstartid": edit["revid"],
                "titles": edit["title"],
                "format": "json"
            })).json()["query"]["pages"]

            # if this fails, the revision has probably been revdel'd
            try:
                for item in wikitext:
                    wikitext = wikitext[item]["revisions"][-1]["*"]
                
                review_templates = re.findall(r"{{afc submission\|.+?\|ts=(\d+)(?:.+?)?}}", wikitext, flags=re.IGNORECASE)
                review_templates = [int(x) for x in review_templates]
                review["age"] = get_time(edit["timestamp"]) - get_time(str(max(review_templates)), date_format="%Y%m%d%H%M%S")
            except:
                pass

            reviews.append(review)

        start = edits[-1]["timestamp"]


def update_page(user, participants_page, reviews, wiki):
    content = requests.get(query_url({
        "action": "query",
        "prop": "revisions",
        "rvprop": "content",
        "titles": f"{participants_page}/{user}",
        "format": "json",
        "rvlimit": 1
    })).json()["query"]["pages"]

    for item in content:
        content = content[item]

    content = "" if "missing" in content else content["revisions"][0]["*"]
    reviews.reverse()

    for review in reviews:
        if review["type"] in ["decline", "reject"]:
            reasons = ["{{abbr|" + reason[0] + "|" + reason[1] + "}}" for reason in review["reasons"]]
            content += f"\n# {'Declined' if review['type'][0] == 'd' else 'Rejected'} [[{review['page']}]] at {review['timestamp']} ([[Special:Diff/{review['revid']}|diff]]; {', '.join(reasons)}; had been pending for {round(review['age'] / 86400)} days)"
        else:
            content += f"\n# Accepted [[{review['page']}]] at {review['timestamp']} ([[Special:Diff/{review['revid']}|diff]]; had been pending for {round(review['age'] / 86400)} days)"
    
    page = pywikibot.Page(wiki, f"{participants_page}/{user}")
    page.text = content
    page.save("[[Wikipedia:Bots|Bot]]: Updating page with new reviews")


def update_leaderboard(users, participants_page, leaderboard_page, wiki):
    # for each user, calculate the total score, statistics, etc.
    # also subtract failed and invalid re-reviews
    user_statistics = {}
    for user in users:
        user_statistics[user] = {
            "score": 0,
            "total": 0,
            "accepted": 0,
            "declined": 0,
            "rejected": 0,
            "failed": 0,
            "passed": 0,
            "invalid": 0,
            "rereviews": 0
        }

    for user in users:
        content = requests.get(query_url({
            "action": "query",
            "prop": "revisions",
            "rvprop": "content",
            "titles": f"{participants_page}/{user}",
            "format": "json",
            "rvlimit": 1
        })).json()["query"]["pages"]

        for item in content:
            content = content[item]

        content = "" if "missing" in content else content["revisions"][0]["*"]
        scores = []

        for line in content.split("\n"):
            try:
                if line.startswith("# "):
                    user_statistics[user][line.split(" ")[1].lower()] += 1
                    user_statistics[user]["total"] += 1

                    age = re.match(r".+?pending for (\d+) days", line)
                    age = int(age[1]) if age else 0
                    score_to_add = 1

                    if age >= 30:
                        score_to_add += 0.5
                    
                    if age > 90:
                        score_to_add += 0.5
                    
                    scores.append(score_to_add)
                elif line.startswith("#:"):
                    # try to find a bolded re-review
                    review = re.match(r"#: ?'''(.+?)'''", line)
                    if not review:
                        continue
                    
                    review = review[1].lower()

                    if review == "invalid" or review == "fail":
                        scores[-1] = 0

                    if review == "fail":
                        user_statistics[user]["failed"] += 1

                    if review == "pass":
                        user_statistics[user]["passed"] += 1

                    if review == "invalid":
                        user_statistics[user]["invalid"] += 1

                    review_user = re.findall(r"\[\[User:([^\|\]]+)", line)

                    if len(review_user) == 0:
                        continue

                    for u in user_statistics:
                        if u.lower() == review_user[-1].lower():
                            user_statistics[u]["rereviews"] += 1
                            break
            except:
                pass

        user_statistics[user]["score"] = sum(scores)

    for user in user_statistics:
        user_statistics[user]["score"] += user_statistics[user]["rereviews"]


    # create table with statistics
    now = current_time()
    time_ago = "{{time ago|" + now[0] + "}} (" + now[1] + ")"
    table = "Last updated " + time_ago + "\n{| class=\"wikitable sortable\"\n! Rank !! User !! Score !! Total !! Accepted !! Declined !! Rejected !! Re-reviews completed !! Passed !! Failed !! Invalid"

    # sort users by score
    user_statistics = {k: v for k, v in sorted(user_statistics.items(), key=lambda item: item[1]["score"], reverse=True)}
    index = 0

    for user in user_statistics:
        failed = user_statistics[user]["failed"]
        if failed == 0:
            failed = ""
        index += 1
        score = user_statistics[user]["score"]
        score = int(score) if score == int(score) else score
        table += f"""
|-
| {index}
| [[User:{user}|{user}]] ([[User talk:{user}|talk]] &bull; [[{participants_page}/{user}|reviews]])
| {score}
| {user_statistics[user]["total"]}
| {user_statistics[user]["accepted"]}
| {user_statistics[user]["declined"]}
| {user_statistics[user]["rejected"]}
| {user_statistics[user]["rereviews"]}
| {user_statistics[user]["passed"]}
| {user_statistics[user]["failed"]}
| {user_statistics[user]["invalid"]}"""

    table += "\n|}"

    leaderboard = pywikibot.Page(wiki, leaderboard_page)
    leaderboard.text = table
    leaderboard.save(summary="[[Wikipedia:Bots|Bot]]: Updating leaderboard")


def main():
    # start pywikibot
    site = pywikibot.Site("en", "wikipedia")
    site.login()

    # get metadata about the drive - start & end dates, etc.
    metadata = get_metadata()

    # get the users participating in the drive
    users = get_users(metadata["participants_page"])

    for user in users:
        if not user in metadata["users_fetched"]:
            metadata["users_fetched"][user] = metadata["start_date"]

    # get the edits made by each user since last fetched
    for user in users:
        print(f"Fetching reviews for {user} from {metadata['users_fetched'][user]} to now...")
        reviews = get_reviews(user, metadata["users_fetched"][user])

        if len(reviews) > 0:
            update_page(user, metadata["participants_page"], reviews, site)

        metadata["users_fetched"][user] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
        open("metadata.json", "w").write(json.dumps(metadata, indent="\t"))
    
    # update the metadata
    metadata["last_fetched"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    open("metadata.json", "w").write(json.dumps(metadata, indent="\t"))

    print("Updating leaderboard...")
    update_leaderboard(users, metadata["participants_page"], metadata["leaderboard_page"], site)


if __name__ == "__main__":
    main()