User:Lowercase sigmabot III/Source.py
Appearance
Source as of 09:16, 9 November 2013 (UTC).
#!/usr/bin/python3
import collections
import sys
import re
import itertools
import hashlib
from arrow import Arrow
from datetime import timedelta
from ceterach.api import MediaWiki
from ceterach.page import Page
from ceterach import passwords
import mwparserfromhell as mwp
# TODO: Fix the bugs
# TODO: unarchive threads
# t("Nationalism", "Hunter S. Thompson")
# ut("Mathmo")
#__all__ = ()
STAMP_RE = re.compile(r"(\d\d:\d\d, .*? \d\d\d\d \(UTC\))")
HEAD_RE = re.compile("^== *([^=].*?) *== *$")
THE_FUTURE = Arrow.utcnow() + timedelta(365)
MONTHS = [None, "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"
]
def mwp_parse(text):
# Earwig :(
return mwp.parser.Parser().parse(text, skip_style_tags=True)
def ucfirst(s: str):
"""
Now with better namespace checks
"""
if ":" in s:
assert s.count(":") == 1, "This is terrible"
return ":".join(map(ucfirst, s.split(":")))
return s[0].upper() + s[1:] if len(s) else s
def get_nums(s: str):
return "".join(re.findall("[0-9]", s))
def make_key(title):
"""echo -en "${salt}\n${title}" | md5sum"""
md5sum = hashlib.new("md5", open("salt", "rb").read() + b"\n")
md5sum.update(title.encode("utf8"))
return md5sum.hexdigest()
def str2time(s: str):
"""Accepts a string defining a time period:
7d - 7 days
36h - 36 hours
Returns the corresponding time, measured in seconds."""
s = s.lower()
if s[-1] == 'd':
return int(s[:-1]) * 24 * 3600
elif s[-1] == 'h':
return int(s[:-1]) * 3600
else:
return int(s)
def str2size(s: str):
"""Accepts a string defining a size:
1337 - 1337 bytes
150K - 150 kilobytes
2M - 2 megabytes
Returns a tuple (size,unit), where size is an integer and unit is
'B' (bytes) or 'T' (threads)."""
unit = s[-1].lower()
if unit.isdigit():
return int(s), "B"
if unit == 'k':
return int(s[:-1]) * 1024, 'B'
elif unit == 'm':
return int(s[:-1]) * 1024 * 1024, 'B'
elif unit == 't':
return int(s[:-1]), 'T'
else:
return int(s[:-1]) * 1024, 'B'
class DiscussionPage(Page):
def __init__(self, api: MediaWiki, title: str, archiver):
super().__init__(api, title)
self.archiver = archiver
self.talkhead = ""
self.threads = []
self.sections = []
def reset(self):
self.threads = []
self.sections = []
self.talkhead = ""
def generate_threads(self):
code = mwp_parse(self.content)
# Take all sections after the 0th because the first section might
# be the 'entrance hall' for the page.
sects = iter(code.get_sections())
self.talkhead = str(next(sects)) # Consume the lead section
for section in sects: # WT:TW
if section.get(0).level < 3:
break
self.talkhead += str(section)
del sects # Large talk pages will waste memory
# Jasper has an annoying talk page
# If there is a level 1 header, it probably has level 2 children.
# Because get_sections(levels=[1, 2]) will yield the level 2 sections
# later, we can just take the level 1 header and ignore its children.
for section in code.get_sections(levels=[1, 2]):
head = section.filter_headings()[0]
if head.level == 1:
section = section.get_sections(include_lead=False, flat=True)[0]
d = {"header": "", "content": "",
"stamp": THE_FUTURE, "oldenough": False
}
d['header'] = str(head)
d['content'] = str(section[len(head):])
self.threads.append(d)
self.sections.append(section)
self.parse_stamps() # Modify this if the wiki has a weird stamp format
def parse_stamps(self, expr=STAMP_RE, fmt='%H:%M, %d %B %Y (%Z)'):
stamps = []
algo = self.archiver.config['algo']
try:
maxage = str2time(re.search(r"^old\((\w*)\)$", algo).group(1))
except AttributeError:
raise Exception("Malformed archive configuration: " + self.title)
for thread in self.threads:
if not HEAD_RE.match(thread['header']):
# the header is not level 2
stamps = []
continue
for stamp in expr.findall(thread['content']):
# This for loop can probably be optimised, but ain't nobody
# got time fo' dat
try:
stamps.append(Arrow.strptime(stamp, fmt))
except ValueError:
#if len(stamp.strip()) > 30:
# September has the highest len() of all the months
# Anything longer than this means that the regex broke.
continue # Or any other fuckups about inconsistent dates
try:
# The most recent stamp should be used to see if we should archive
most_recent = max(stamps)
thread['stamp'] = most_recent
thread['oldenough'] = Arrow.utcnow() - most_recent > timedelta(seconds=maxage)
except ValueError:
pass # No stamps were found, abandon thread
stamps = []
def rebuild_talkhead(self, dry=False):
"""
Specify the dry parameter if you only want to see if there's
an archive template on the page.
"""
new_tpl = self.archiver.generate_template()
talkhead = mwp_parse(self.content).get_sections()[0]
for talkhead_tpl_ref in talkhead.filter_templates():
tpl_name = talkhead_tpl_ref.name.strip_code().strip()
if ucfirst(tpl_name) == ucfirst(self.archiver.tl):
#if talkhead_tpl_ref.name == "User:MiszaBot/config":
break
else:
print(self.title, "!!!!")
for tpl in talkhead.filter_templates():
print(tpl.name.strip(), end=', ')
raise Exception("No talk head")
#return 0x1337 # Our duty is done, and this function broke
if dry:
return # Our duty is done, and this function worked
for p in new_tpl.params:
if talkhead_tpl_ref.has_param(p.name):
talkhead_tpl_ref.add(p.name, p.value)
self.talkhead = str(talkhead)
del new_tpl, talkhead
def update(self, archives_touched=None):
"""
Remove the archives from the talk page after they have been archived
"""
self.rebuild_talkhead()
text = str(self.talkhead) + "".join(map(str, self.sections))
# Instead of counting the sections in the archives, we can count the
# sections we removed from the page
arch_thread_count = len([sect for sect in self.sections if not sect])
# Fancier edit summary stuff
summ = "Archiving {0} discussion(s) to {1}) (bot"
titles = "/dev/null"
if archives_touched:
titles = ", ".join("[[" + tit + "]]" for tit in archives_touched)
#else:
# titles = "/dev/null" # Hopefully this doesn't happen
# Nothing was archived, but apparently the page changed
#print("WTF on", self.title)
#return # Fuck that noise
summ = summ.format(arch_thread_count, titles)
if text != self.content:
if not archives_touched:
# The talk page was edited, but nothing was moved to the archives
raise Exception("Nothing moved to archives")
#with open("badkeys", "a") as badkeys:
# print("WTF on", self.title, file=badkeys)
#return
print(self.edit(text, summ, minor=True))
raise Exception("Nothing happened")
class Archiver:
def __init__(self, api: MediaWiki, title: str, tl="User:MiszaBot/config"):
self.config = {'algo': 'old(24h)',
'archive': '',
'archiveheader': "{{Talk archive}}",
'maxarchivesize': '8796093022208M',
'minthreadsleft': '5',
'minthreadstoarchive': 1,
'counter': '1',
'key': '',
}
self.api = api
self.tl = tl
self.archives_touched = []
self.page = DiscussionPage(api, title, self)
def generate_config(self):
code = mwp_parse(self.page.content)
template = code.filter_templates(matches=self.tl)[0]
for p in template.params:
self.config[p.name.strip()] = p.value.strip()
#self.config = {
#"type": "size",
#"age": "96",
#"archive": "User talk:Example/Archives/%(year)d/%(monthname)s",
#"keepthreads": "2"
#}
arch_string = self.config['archive'].replace("_", " ").strip()
self.config['archive'] = arch_string # Normalise the archive titles
try:
self.config['counter'] = int(get_nums(self.config['counter']) or 1)
self.config['minthreadstoarchive'] = int(self.config['minthreadstoarchive'])
except Exception:
print("Could not intify:", self.page.title)
raise
def generate_template(self):
"""Return a template with an updated counter"""
# DONTFIXME: Preserve template formatting shit
# This is only called so the params can be extracted.
code = mwp.nodes.Template(self.tl)
for paramname, val in self.config.items():
code.add(paramname, val)
return code
def archive_threads(self):
"""Move the threads from the talk page to the archives."""
keep_threads = int(self.config['minthreadsleft'])
fmt_str = self.config['archive']
max_arch_size = str2size(self.config['maxarchivesize'])
arched_so_far = 0
#print("Sorting threads...")
#self.page.threads.sort(key=lambda d: d['stamp']) # Oldest at index 0
# ^ Hue, it looks like I'm implementing that line after all
archives_to_touch = collections.defaultdict(str)
# strftime() to create the keys for archives_to_touch
# Values should be the text to append, text should be matched to
# corresponding key based on where the thread belongs
# Then iterate over .items() and edit the pages
p = self.api.page("Coal ball") # Crappy way to cache titles
arch_thread_count, text = 0, '' # This shuts up PyCharm
# Archive the oldest threads first, not the highest threads
# that happen to be old
threads_with_indices = sorted(enumerate(self.page.threads), key=lambda t: t[1]['stamp'])
for index, thread in threads_with_indices:
#print(index)
#for index, thread in enumerate(self.page.threads):
if len(self.page.threads) - arched_so_far <= keep_threads:
print("Keep at least {0} threads on {1}".format(keep_threads, self.page.title))
break # "Keep at least {0} threads on page".format(keep_threads)
if not thread["oldenough"]:
continue # Thread is too young to archive
stamp = thread['stamp']
print(thread['header'], "is old enough with stamp", stamp)
params = {'counter': int(self.config['counter']),
'year': stamp.year,
'month': stamp.month,
'monthname': MONTHS[stamp.month],
'monthnameshort': MONTHS[stamp.month][:3],
'week': stamp.week,
}
subpage = fmt_str % params
# rFIXME: We're checking if THE TALK PAGE > max_arch_size, not the archive page
# This is a crappy way to not waste API queries on .load_attributes()
if p.title != subpage:
p = self.api.page(subpage)
try:
text = mwp_parse(p.content)
except: # Page doesn't exist, but I'm too lazy to NonexistentPageError
text = mwp_parse("")
arch_thread_count = len(text.get_sections(levels=[2]))
#if p.exists:
arch_size = len(text) + len(archives_to_touch[subpage])
if max_arch_size[1] == "T":
# Size is measured in threads
if arch_thread_count > max_arch_size[0]:
print("Increment counter")
self.config['counter'] += 1
continue
elif max_arch_size[1] == "B":
# Size is measured in bytes
if arch_size > max_arch_size[0]:
print("Increment counter")
self.config['counter'] += 1
continue
print("Archive subpage:", p.title)
arched_so_far += 1
archives_to_touch[subpage] += thread['header']
archives_to_touch[subpage] += thread['content']
# Remove this thread from the talk page
self.page.sections[index] = ""
archives_actually_touched = []
for title, content in archives_to_touch.items():
page = self.api.page(title)
if arched_so_far < self.config['minthreadstoarchive']:
print("arch_thread_count < min threads to archive")
break
arch_thread_count = len(mwp_parse(content).get_sections(levels=[2]))
# TODO: this shit isn't updating for some reason
summ = "Archiving {0} discussion(s) from [[{1}]]) (bot"
summ = summ.format(arch_thread_count, self.page.title)
if page.exists:
page.append("\n\n" + content, summ, minor=True)
# Somehow remove the thread from the main talk page
else:
content = self.config['archiveheader'] + "\n\n" + content
page.create(content, summ, minor=True)
archives_actually_touched.append(title)
self.archives_touched = frozenset(archives_actually_touched)
def key_ok(self):
return self.config['key'] == make_key(self.page.title)
def run(self):
self.generate_config()
self.page.generate_threads()
if self.page.rebuild_talkhead(dry=True) == 0x1337:
raise Exception("No talk header")
#with open("notalkheaders", "a") as mauvais:
# print(self.page.title, file=mauvais)
#return # No talk header
if not self.config['archive'].startswith(self.page.title + "/"):
if self.key_ok():
self.archive_threads()
self.page.update(self.archives_touched)
else:
raise Exception("Bad key")
#with open("badkeys", "a") as badkeys:
# print(self.page.title, end=": ", file=badkeys)
# print(make_key(self.page.title), "!=", self.config['key'], file=badkeys)
else:
self.archive_threads()
self.page.update(self.archives_touched)
if __name__ == "__main__":
def page_gen_dec(ns):
def decorator(func):
def real_decorator(*pages):
for shit in func(*pages):
yield ":".join([ns, shit])
return real_decorator
return decorator
@page_gen_dec("User talk")
def ut(*pgs):
for s in pgs: yield s
@page_gen_dec("Talk")
def t(*pgs):
for s in pgs: yield s
@page_gen_dec("Wikipedia")
def wp(*pgs):
for s in pgs: yield s
@page_gen_dec("Wikipedia talk")
def wt(*pgs):
for s in pgs: yield s
api = MediaWiki("https://en.wikipedia.org/w/api.php", config={"retries": 2})
api.login("Lowercase sigmabot III", passwords.lcsb3)
api.set_token("edit")
victims = itertools.chain(ut("Quadell",
"Legoktm",
"BlueMoonset",
"Jasper Deng",
"Hym411",
"The Earwig",
),
wp("Administrators' noticeboard/Edit warring",
"Requests for undeletion",
),
t("RuneScape",
"Main Page",
),
wt("Did you know",
"Twinkle"
),
)
#victims = (x['title'] for x in api.iterator(list='embeddedin', eititle='User:MiszaBot/config', eilimit=5000))
if len(sys.argv) > 1:
victims = sys.argv[1:]
for v in victims:
bot = Archiver(api, v)
try:
bot.run()
except Exception as e:
with open("errlog", "a") as errlog:
print(e, bot.page.title, file=errlog)