User:Qwerfjkl (bot)/code/CategoryMonitor
Appearance
import time, re, pywikibot
from pywikibot.comms.eventstreams import EventStreams
from mwparserfromhell import parse
import json
config_page = pywikibot.Page(pywikibot.Site('en', 'wikipedia'), 'User:Qwerfjkl (bot)/CategoryMonitorConfig.json')
config = json.loads(config_page.text)
for lang in config:
config[lang]['site'] = pywikibot.Site(config[lang]['site'])
stream = EventStreams(streams=['recentchange'],
since=config['en']['site'].server_time()) # base time off of enwiki
def stream_filter(data):
# print(data)
lang = data['server_name'].split('.')[0]
return (data['server_name'].endswith('.wikipedia.org') and data['type'] == 'categorize' and data['bot'] == False and lang in config and data['title'] in config[lang]['CS1CATS'] and config[lang]['removedmessage'] not in data['comment'])
# stream.register_filter(stream_filter, ftype='all')
# print(config)
SKIPTAGS = ['mw-undo', 'mw-revert', 'mw-rollback', 'mw-reverted', 'mw-manual-revert']
prev = False
group = []
def notify(data):
print('Handling case')
ts = data['timestamp']
lang = data['server_name'].replace('.wikipedia.org', '')
site = config[lang]['site']
# if lang == 'sq':
# print(data)
# input()
if group:
# get different titles
titles = list(set([re.match(r"\[\[:(.+?)\]\]", data['comment']).group(1) for data in group]))
print(titles)
for title in titles:
changes = [data for data in group if title in data['comment']]
cat_changes = list(set(data['title'] for data in changes if data['title'] in config[lang]['CS1CATS'] and config[lang]['removedmessage'] not in data['comment']))
if cat_changes:
user = changes[0]['user']
timestamp = pywikibot.Timestamp.fromISOformat(ts, sep='T')
page = pywikibot.Page(site, title)
if not page.text or page.namespace().id != 0: # blank, non-existant, or not in mainspace
print('Bad page title', page.title())
continue
# todo: use 'notify_url': 'https://sq.wikipedia.org/w/index.php?diff=2643546&oldid=1587213'
revisions = [revision for revision in list(page.revisions()) if revision['timestamp'] == timestamp]
if len(revisions) != 1:
# print(revisions)
print('Error, multiple/no edits with that timestamp', page.title())
continue
else:
revision = revisions[0]
if any(tag in revision['tags'] for tag in SKIPTAGS):
print('Skipped, skiptags')
continue # skip
revid = revision['revid']
print(f'Special:Diff/{revid}')
time_diff = 60*15 + int(float(timestamp.posix_timestamp_format())) - int(float(pywikibot.Timestamp.fromISOformat(site.server_time(), sep='T').posix_timestamp_format()))
if time_diff > 0:
print(f'Sleeping {time_diff} seconds')
time.sleep(time_diff)
else:
pass
print(time_diff)
print('Sleep over, checking')
page.get(force=True)
pagecats = [cat.title() for cat in page.categories()]
print(pagecats, cat_changes)
current_errors = [cat for cat in cat_changes if cat in pagecats] # errors still present on page
if not any(cat in pagecats for cat in cat_changes):
print('Error has been fixed, skipping')
continue
count = str(len(current_errors))
if lang == 'en' and 'Category:CS1 errors: bare URL' in current_errors and 'Category:CS1 errors: missing title' in current_errors:
current_errors.remove('Category:CS1 errors: bare URL')
current_errors.remove('Category:CS1 errors: missing title')
current_errors.insert(0, 'Category:CS1 errors: bare URLCategory:CS1 errors: missing title')
if lang =='en':
cause = lambda x : 'Category:CS1 errors: bare URL|bare URL]] and [[:Category:CS1 errors: missing title|missing title' if x == 'Category:CS1 errors: bare URLCategory:CS1 errors: missing title' else x+'|'+x.replace('Category:CS1 errors: ', '')
elif lang == 'sq':
# x + | + all content after the second colon (otherwise just the input)
cause = lambda x : x+'|'+ ( x[[i for i, n in enumerate(x) if n == ':'][1]+1:].lstrip() ) if x.count(':') >= 2 else x
else: # fallback
cause = lambda x : x
notif_page = pywikibot.Page(site, 'User talk:'+user)
if notif_page.isRedirectPage():
notif_page = notif_page.getRedirectTarget()
# Allow optout
cont = False
for template in notif_page.templatesWithParams():
if template[0].title() == 'Template:Bots' or template[0].title() == 'Template:Nobots':
for param in template[1]:
if 'optout' in param:
optout_values = param.split('=')[1].strip().split(',')
if 'cs1-errors' in optout_values:
cont = True
if cont: # exit for loop to skip main loop
print('Skipped, user opted-out')
continue
header = f"== {config[lang]['sectionheader'].format(title=title)} =="
top = f"\n{{{{subst:User:Qwerfjkl (bot)/inform/top|count={count}|page={title}|diff={str(revid)}}}}}"
middle = "\n".join([f"{{{{subst:User:Qwerfjkl (bot)/inform/middle|causes={config[lang]['causesmessage'].format(cause=cause(category))}|cat={category}|page={title}|diff={str(revid)}}}}}" for category in current_errors])
end = f"\n{{{{subst:User:Qwerfjkl (bot)/inform/bottom|page={title}|diff={str(revid)}}}}}"
text = header+top+middle+end
# print(text)
current = notif_page.text
notif_page.text = current + '\n' + text
try:
notif_page.save(f"/* {parse(config[lang]['sectionheader'].format(title=title)).strip_code()} */ {config[lang]['editsummary'].format(title=title)}", botflag=False)
print(f'[[{title}]] was added to: {", ".join(cat_changes)} at {ts} by {user}')
except Exception as e:
print(f"Exception on {page}: {e}")
print('Edit handler started')
while True:
change = next(iter(stream))
if stream_filter(change):
print(change)
current = {'timestamp': change['meta']['dt'], 'server_name': change['server_name']}
if prev and current['timestamp'] == prev['timestamp'] and current['server_name'] == prev['server_name'] and stream_filter(change):
print('Added change to list to notify')
group.append(change)
elif prev and group:
print('Notifying and clearing')
notify(prev) #pass in ts
group.clear()
if stream_filter(change):
group.append(change)
else:
pass
# print(f'Prev: {prev}, Group: {group} was therefore skipped.')
prev = current.copy()