Jump to content

User:Lowercase sigmabot III/Source.py

From Wikipedia, the free encyclopedia
This is an old revision of this page, as edited by Lowercase sigmabot III (talk | contribs) at 09:16, 9 November 2013 (Updating source) (bot). The present address (URL) is a permanent link to this revision, which may differ significantly from the current revision.
(diff) ← Previous revision | Latest revision (diff) | Newer revision → (diff)

Source as of 09:16, 9 November 2013 (UTC).

#!/usr/bin/python3

import collections
import sys
import re
import itertools
import hashlib

from arrow import Arrow
from datetime import timedelta
from ceterach.api import MediaWiki
from ceterach.page import Page
from ceterach import passwords
import mwparserfromhell as mwp
# TODO: Fix the bugs
# TODO: unarchive threads
# t("Nationalism", "Hunter S. Thompson")
# ut("Mathmo")
#__all__ = ()
STAMP_RE = re.compile(r"(\d\d:\d\d, .*? \d\d\d\d \(UTC\))")
HEAD_RE = re.compile("^== *([^=].*?) *== *$")
THE_FUTURE = Arrow.utcnow() + timedelta(365)
MONTHS = [None, "January", "February", "March", "April", "May", "June",
          "July", "August", "September", "October", "November", "December"
]


def mwp_parse(text):
    # Earwig :(
    return mwp.parser.Parser().parse(text, skip_style_tags=True)


def ucfirst(s: str):
    """
    Now with better namespace checks
    """
    if ":" in s:
        assert s.count(":") == 1, "This is terrible"
        return ":".join(map(ucfirst, s.split(":")))
    return s[0].upper() + s[1:] if len(s) else s


def get_nums(s: str):
    return "".join(re.findall("[0-9]", s))

def make_key(title):
    """echo -en "${salt}\n${title}" | md5sum"""
    md5sum = hashlib.new("md5", open("salt", "rb").read() + b"\n")
    md5sum.update(title.encode("utf8"))
    return md5sum.hexdigest()


def str2time(s: str):
    """Accepts a string defining a time period:
    7d - 7 days
    36h - 36 hours
    Returns the corresponding time, measured in seconds."""
    s = s.lower()
    if s[-1] == 'd':
        return int(s[:-1]) * 24 * 3600
    elif s[-1] == 'h':
        return int(s[:-1]) * 3600
    else:
        return int(s)


def str2size(s: str):
    """Accepts a string defining a size:
    1337 - 1337 bytes
    150K - 150 kilobytes
    2M - 2 megabytes
    Returns a tuple (size,unit), where size is an integer and unit is
    'B' (bytes) or 'T' (threads)."""
    unit = s[-1].lower()
    if unit.isdigit():
        return int(s), "B"
    if unit == 'k':
        return int(s[:-1]) * 1024, 'B'
    elif unit == 'm':
        return int(s[:-1]) * 1024 * 1024, 'B'
    elif unit == 't':
        return int(s[:-1]), 'T'
    else:
        return int(s[:-1]) * 1024, 'B'


class DiscussionPage(Page):
    def __init__(self, api: MediaWiki, title: str, archiver):
        super().__init__(api, title)
        self.archiver = archiver
        self.talkhead = ""
        self.threads = []
        self.sections = []

    def reset(self):
        self.threads = []
        self.sections = []
        self.talkhead = ""

    def generate_threads(self):
        code = mwp_parse(self.content)
        # Take all sections after the 0th because the first section might
        # be the 'entrance hall' for the page.
        sects = iter(code.get_sections())
        self.talkhead = str(next(sects))  # Consume the lead section
        for section in sects:  # WT:TW
            if section.get(0).level < 3:
                break
            self.talkhead += str(section)
        del sects  # Large talk pages will waste memory
        # Jasper has an annoying talk page
        # If there is a level 1 header, it probably has level 2 children.
        # Because get_sections(levels=[1, 2]) will yield the level 2 sections
        # later, we can just take the level 1 header and ignore its children.
        for section in code.get_sections(levels=[1, 2]):
            head = section.filter_headings()[0]
            if head.level == 1:
                section = section.get_sections(include_lead=False, flat=True)[0]
            d = {"header": "", "content": "",
                 "stamp": THE_FUTURE, "oldenough": False
            }
            d['header'] = str(head)
            d['content'] = str(section[len(head):])
            self.threads.append(d)
            self.sections.append(section)
        self.parse_stamps()  # Modify this if the wiki has a weird stamp format

    def parse_stamps(self, expr=STAMP_RE, fmt='%H:%M, %d %B %Y (%Z)'):
        stamps = []
        algo = self.archiver.config['algo']
        try:
            maxage = str2time(re.search(r"^old\((\w*)\)$", algo).group(1))
        except AttributeError:
            raise Exception("Malformed archive configuration: " + self.title)
        for thread in self.threads:
            if not HEAD_RE.match(thread['header']):
                # the header is not level 2
                stamps = []
                continue
            for stamp in expr.findall(thread['content']):
                # This for loop can probably be optimised, but ain't nobody
                # got time fo' dat
                try:
                    stamps.append(Arrow.strptime(stamp, fmt))
                except ValueError:
                    #if len(stamp.strip()) > 30:
                    # September has the highest len() of all the months
                    # Anything longer than this means that the regex broke.
                    continue  # Or any other fuckups about inconsistent dates
            try:
                # The most recent stamp should be used to see if we should archive
                most_recent = max(stamps)
                thread['stamp'] = most_recent
                thread['oldenough'] = Arrow.utcnow() - most_recent > timedelta(seconds=maxage)
            except ValueError:
                pass  # No stamps were found, abandon thread
            stamps = []

    def rebuild_talkhead(self, dry=False):
        """
        Specify the dry parameter if you only want to see if there's
        an archive template on the page.
        """
        new_tpl = self.archiver.generate_template()
        talkhead = mwp_parse(self.content).get_sections()[0]
        for talkhead_tpl_ref in talkhead.filter_templates():
            tpl_name = talkhead_tpl_ref.name.strip_code().strip() 
            if ucfirst(tpl_name) == ucfirst(self.archiver.tl):
            #if talkhead_tpl_ref.name == "User:MiszaBot/config":
                break
        else:
            print(self.title, "!!!!")
            for tpl in talkhead.filter_templates():
                print(tpl.name.strip(), end=', ')
            raise Exception("No talk head")
            #return 0x1337  # Our duty is done, and this function broke
        if dry:
            return  # Our duty is done, and this function worked
        for p in new_tpl.params:
            if talkhead_tpl_ref.has_param(p.name):
                talkhead_tpl_ref.add(p.name, p.value)
        self.talkhead = str(talkhead)
        del new_tpl, talkhead

    def update(self, archives_touched=None):
        """
        Remove the archives from the talk page after they have been archived
        """
        self.rebuild_talkhead()
        text = str(self.talkhead) + "".join(map(str, self.sections))
        # Instead of counting the sections in the archives, we can count the
        # sections we removed from the page
        arch_thread_count = len([sect for sect in self.sections if not sect])
        # Fancier edit summary stuff
        summ = "Archiving {0} discussion(s) to {1}) (bot"
        titles = "/dev/null"
        if archives_touched:
            titles = ", ".join("[[" + tit + "]]" for tit in archives_touched)
        #else:
        #    titles = "/dev/null"  # Hopefully this doesn't happen
        #    Nothing was archived, but apparently the page changed
            #print("WTF on", self.title)
            #return  # Fuck that noise
        summ = summ.format(arch_thread_count, titles)
        if text != self.content:
            if not archives_touched:
                # The talk page was edited, but nothing was moved to the archives
                raise Exception("Nothing moved to archives")
                #with open("badkeys", "a") as badkeys:
                #    print("WTF on", self.title, file=badkeys)
                #return
            print(self.edit(text, summ, minor=True))
        raise Exception("Nothing happened")


class Archiver:
    def __init__(self, api: MediaWiki, title: str, tl="User:MiszaBot/config"):
        self.config = {'algo': 'old(24h)',
                       'archive': '',
                       'archiveheader': "{{Talk archive}}",
                       'maxarchivesize': '8796093022208M',
                       'minthreadsleft': '5',
                       'minthreadstoarchive': 1,
                       'counter': '1',
                       'key': '',
        }
        self.api = api
        self.tl = tl
        self.archives_touched = []
        self.page = DiscussionPage(api, title, self)


    def generate_config(self):
        code = mwp_parse(self.page.content)
        template = code.filter_templates(matches=self.tl)[0]
        for p in template.params:
            self.config[p.name.strip()] = p.value.strip()
            #self.config = {
            #"type": "size",
            #"age": "96",
            #"archive": "User talk:Example/Archives/%(year)d/%(monthname)s",
            #"keepthreads": "2"
        #}
        arch_string = self.config['archive'].replace("_", " ").strip()
        self.config['archive'] = arch_string  # Normalise the archive titles
        try:
            self.config['counter'] = int(get_nums(self.config['counter']) or 1)
            self.config['minthreadstoarchive'] = int(self.config['minthreadstoarchive'])
        except Exception:
            print("Could not intify:", self.page.title)
            raise

    def generate_template(self):
        """Return a template with an updated counter"""
        # DONTFIXME: Preserve template formatting shit
        # This is only called so the params can be extracted.
        code = mwp.nodes.Template(self.tl)
        for paramname, val in self.config.items():
            code.add(paramname, val)
        return code

    def archive_threads(self):
        """Move the threads from the talk page to the archives."""
        keep_threads = int(self.config['minthreadsleft'])
        fmt_str = self.config['archive']
        max_arch_size = str2size(self.config['maxarchivesize'])
        arched_so_far = 0
        #print("Sorting threads...")
        #self.page.threads.sort(key=lambda d: d['stamp']) # Oldest at index 0
        # ^ Hue, it looks like I'm implementing that line after all
        archives_to_touch = collections.defaultdict(str)
        # strftime() to create the keys for archives_to_touch
        # Values should be the text to append, text should be matched to
        # corresponding key based on where the thread belongs
        # Then iterate over .items() and edit the pages
        p = self.api.page("Coal ball")  # Crappy way to cache titles
        arch_thread_count, text = 0, ''  # This shuts up PyCharm
        # Archive the oldest threads first, not the highest threads
        # that happen to be old
        threads_with_indices = sorted(enumerate(self.page.threads), key=lambda t: t[1]['stamp'])
        for index, thread in threads_with_indices:
        #print(index)
        #for index, thread in enumerate(self.page.threads):
            if len(self.page.threads) - arched_so_far <= keep_threads:
                print("Keep at least {0} threads on {1}".format(keep_threads, self.page.title))
                break  # "Keep at least {0} threads on page".format(keep_threads)
            if not thread["oldenough"]:
                continue  # Thread is too young to archive
            stamp = thread['stamp']
            print(thread['header'], "is old enough with stamp", stamp)
            params = {'counter': int(self.config['counter']),
                      'year': stamp.year,
                      'month': stamp.month,
                      'monthname': MONTHS[stamp.month],
                      'monthnameshort': MONTHS[stamp.month][:3],
                      'week': stamp.week,
            }
            subpage = fmt_str % params
            # rFIXME: We're checking if THE TALK PAGE > max_arch_size, not the archive page
            # This is a crappy way to not waste API queries on .load_attributes()
            if p.title != subpage:
                p = self.api.page(subpage)
                try:
                    text = mwp_parse(p.content)
                except:  # Page doesn't exist, but I'm too lazy to NonexistentPageError
                    text = mwp_parse("")
                arch_thread_count = len(text.get_sections(levels=[2]))
                #if p.exists:
            arch_size = len(text) + len(archives_to_touch[subpage])
            if max_arch_size[1] == "T":
                # Size is measured in threads
                if arch_thread_count > max_arch_size[0]:
                    print("Increment counter")
                    self.config['counter'] += 1
                    continue
            elif max_arch_size[1] == "B":
                # Size is measured in bytes
                if arch_size > max_arch_size[0]:
                    print("Increment counter")
                    self.config['counter'] += 1
                    continue
            print("Archive subpage:", p.title)
            arched_so_far += 1
            archives_to_touch[subpage] += thread['header']
            archives_to_touch[subpage] += thread['content']
            # Remove this thread from the talk page
            self.page.sections[index] = ""
        archives_actually_touched = []
        for title, content in archives_to_touch.items():
            page = self.api.page(title)
            if arched_so_far < self.config['minthreadstoarchive']:
                print("arch_thread_count < min threads to archive")
                break
            arch_thread_count = len(mwp_parse(content).get_sections(levels=[2]))
            # TODO: this shit isn't updating for some reason
            summ = "Archiving {0} discussion(s) from [[{1}]]) (bot"
            summ = summ.format(arch_thread_count, self.page.title)
            if page.exists:
                page.append("\n\n" + content, summ, minor=True)
                # Somehow remove the thread from the main talk page
            else:
                content = self.config['archiveheader'] + "\n\n" + content
                page.create(content, summ, minor=True)
            archives_actually_touched.append(title)
        self.archives_touched = frozenset(archives_actually_touched)

    def key_ok(self):
        return self.config['key'] == make_key(self.page.title)

    def run(self):
        self.generate_config()
        self.page.generate_threads()
        if self.page.rebuild_talkhead(dry=True) == 0x1337:
            raise Exception("No talk header")
            #with open("notalkheaders", "a") as mauvais:
            #    print(self.page.title, file=mauvais)
            #return  # No talk header
        if not self.config['archive'].startswith(self.page.title + "/"):
            if self.key_ok():
                self.archive_threads()
                self.page.update(self.archives_touched)
            else:
                raise Exception("Bad key")
                #with open("badkeys", "a") as badkeys:
                #    print(self.page.title, end=": ", file=badkeys)
                #    print(make_key(self.page.title), "!=", self.config['key'], file=badkeys)
        else:
            self.archive_threads()
            self.page.update(self.archives_touched)


if __name__ == "__main__":
    def page_gen_dec(ns):
        def decorator(func):
            def real_decorator(*pages):
                for shit in func(*pages):
                    yield ":".join([ns, shit])
            return real_decorator
        return decorator

    @page_gen_dec("User talk")
    def ut(*pgs):
        for s in pgs: yield s

    @page_gen_dec("Talk")
    def t(*pgs):
        for s in pgs: yield s

    @page_gen_dec("Wikipedia")
    def wp(*pgs):
        for s in pgs: yield s

    @page_gen_dec("Wikipedia talk")
    def wt(*pgs):
        for s in pgs: yield s

    api = MediaWiki("https://en.wikipedia.org/w/api.php", config={"retries": 2})
    api.login("Lowercase sigmabot III", passwords.lcsb3)
    api.set_token("edit")
    victims = itertools.chain(ut("Quadell",
                                 "Legoktm",
                                 "BlueMoonset",
                                 "Jasper Deng",
                                 "Hym411",
                                 "The Earwig",
                              ),
                              wp("Administrators' noticeboard/Edit warring",
                                 "Requests for undeletion",
                              ),
                              t("RuneScape",
                                "Main Page",
                              ),
                              wt("Did you know",
                                 "Twinkle"
                              ),
    )
    #victims = (x['title'] for x in api.iterator(list='embeddedin', eititle='User:MiszaBot/config', eilimit=5000))
    if len(sys.argv) > 1:
        victims = sys.argv[1:]
    for v in victims:
        bot = Archiver(api, v)
        try:
            bot.run()
        except Exception as e:
            with open("errlog", "a") as errlog:
                print(e, bot.page.title, file=errlog)