Jump to content

User:RussBot/dabmaintbot.py

From Wikipedia, the free encyclopedia
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.
#!/usr/bin/python
"""
dabmaintbot - Bot to update link counts on
[[en:Wikipedia:Disambiguation pages maintenance]]
"""

import datetime
import locale
import re, sys, traceback
import simplejson
import urllib
import wikipedia, pagegenerators

locale.setlocale(locale.LC_ALL, '')

s#Constants:
ACTIVE_CUTOFF = 100
HISTORY_LEN = 6

import datetime
started_at = datetime.datetime.now()

# cache page objects to reduce server load
pagecache = {}

def getPage(title):
    global pagecache
    if '#' in title:
        sf_title = title[:title.index('#')]
    else:
        sf_title = title
    return pagecache.setdefault(sf_title, wikipedia.Page(site, title))

def cacheput(page):
    global pagecache
    title = page.sectionFreeTitle()
    pagecache[title] = page

def prefetch(page):
    while True:
        try:
            page.get(get_redirect=True)
            return
        except wikipedia.BadTitle:
            wikipedia.output("Got BadTitle exception on %s; retrying."
                             % page.title())
            continue
        except wikipedia.Error:
            return

def refcount(page):
    if hasattr(page, "refcount"):
        return page.refcount
    data = {'action': 'query',
            'generator': 'backlinks',
            'gbltitle': page.sectionFreeTitle(),
            'gblnamespace': '0',
            'gbllimit': '500',
            'redirects': 'redirects',
            'format': 'json',
            }
    count = 0
    while True:
        wikipedia.get_throttle()
        try:
#        wikipedia.output("Getting references to %s" % page.aslink())
            reflist = site.getUrl(site.api_address(), data=data)
        except:
            traceback.print_exc(file=sys.stderr)
            continue
        try:
            result = simplejson.loads(reflist)
        except ValueError:
            continue
        if type(result) is not dict or 'query' not in result:
            return 0
        if 'redirects' in result['query']:
            for redirect in result['query']['redirects']:
                if redirect['to'] == page.sectionFreeTitle():
                    count += refcount(wikipedia.Page(site, redirect['from'])) 
        if 'pages' in result['query']:
            for ref_id in result['query']['pages']:
                refpage = result['query']['pages'][ref_id]
                if refpage['title'] != page.sectionFreeTitle():
                    count += 1
        if "query-continue" in result:
            data.update(result['query-continue']['backlinks'])
        else:
            return count

def increasing(seq):
    '''Return True if seq is uniformly increasing (from last to first),
    False otherwise'''
    for index in range( len(seq) - 1 ):
        if seq[index] <= seq[index+1]:
            return False
    return True

def fmt(num):
    return locale.format("%i", num, grouping=True)

try:
    site = wikipedia.getSite()

    #input pages
    maint_page = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/Current list")
    dump_page = wikipedia.Page(site,
                    "User:RussBot/DPL")
    problem_page = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/problems")
    #output pages
    result_page = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/Current list")
    problem_result = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/problems")

    for arg in sys.argv[1:]:
        arg = wikipedia.argHandler(arg, 'dabmaintbot')
        if arg:
            print "Unrecognized command line argument: %s" % arg
            # show help text and exit
            wikipedia.argHandler("-help", "dabmaintbot")

    mylang = site.language()

    fixed_pages = 0
    fixed_links = 0
    problems = []
    m_text = maint_page.get()

    active_r = re.compile(
        r"^# (?:'''&bull; )?\[\[(.+)\]\] *\(([0-9]*) *" +
        r"\[\[Special:Whatlinkshere/(?:.+)\|links\]\]\) *" +
        r"(?:\((?:(?:new)|(?:[-+][0-9]+))\))? *" +
        r"(?:<!-- history (.*?)-->)? *(.*?) *(?:''')? *$", re.M)
    # the groups matched by this regex are:
    # 1.  the title of a disambiguation page
    # 2.  the number of links found last time the bot ran (may be empty)
    # 3.  the history of the page's link count (may be empty), consisting of a
    #     space-separated string of numbers
    # 4.  any notes added by users at the end of the line

    inactive_r = re.compile(
        r'^# \[\[(.+)\]\] \(([0-9]+)\) history ([0-9 ]*):(.*) *$', re.M)
    # the groups matched by this regex are the same as for active_r

    # lists are demarcated by HTML comments

    # Step 1: Collect all links and histories from the last scan
    
    start_mark = u"<!-- section title="
    end_mark = u"<!-- end section -->"
    marker = 0
    new_text = []
    disambiglinks = {}
    total_count = [0, 0, 0, 0]
    sections = []
    diffs = []
    while True:
        section_start = m_text.find(start_mark, marker)
        if section_start == -1:
            break
        title_mark = section_start + len(start_mark)
        section_title = m_text[title_mark:
                               m_text.find(u" -->\n", title_mark)]
        section_marker = title_mark + len(section_title) + len(" -->\n")
        if section_marker >= len(m_text):
            wikipedia.output(
                u"ERROR: cannot locate section title in %s" % section_title)
            raise RuntimeError
        
        section_end = m_text.find(end_mark, section_marker)
        if section_end == -1:
            wikipedia.output(
                u"ERROR: cannot locate end of section %s" % section_title)
            raise RuntimeError
        marker = section_end
        sections.append((section_title, section_marker, section_end))
        sectionnumber = len(sections) - 1
        
        for item in active_r.finditer(m_text, section_marker, section_end):
            link_page_title = item.group(1)
            link_page = getPage(link_page_title)
            try:
                prefetch(link_page)
                while link_page.isRedirectPage():
                    link_page = link_page.getRedirectTarget()
                    prefetch(link_page)
                if not link_page.isDisambig():
                    continue
            except wikipedia.NoPage:
                continue
            link_page_title = link_page.sectionFreeTitle()
            if link_page_title in disambiglinks.keys():
                continue
            count = refcount(link_page)
            wikipedia.output(u"%s [%i]" % (link_page.title(), count))
            if item.group(3):
                history = item.group(3)
            else:
                history = u''
            disambiglinks[link_page_title] = {
                'section': sectionnumber,
                'title': link_page_title,
                'count': count,
                'history_text': history,
                'trailing_text': item.group(4).strip()
            }

        # search for inactive listings, which should always follow active ones
        for item in inactive_r.finditer(m_text, section_marker, section_end):
            link_page_title = item.group(1)
            link_page = getPage(link_page_title)
            try:
                prefetch(link_page)
                while link_page.isRedirectPage():
                    link_page = link_page.getRedirectTarget()
                    prefetch(link_page)
                if not link_page.isDisambig():
                    continue
            except wikipedia.NoPage:
                continue
            link_page_title = link_page.title()
            if link_page_title in disambiglinks.keys():
                continue
            count = refcount(link_page)
            wikipedia.output(u"%s [%i]" % (link_page.title(), count))
            if item.group(3):
                history = item.group(3)
            else:
                history = u''
            disambiglinks[link_page_title] = {
                'section': sectionnumber,
                'title': link_page_title,
                'count': count,
                'history_text': history,
                'trailing_text': item.group(4).strip()
            }

    # Step 2.  Collect links from data dump output page and add any that
    # aren't already in the collection

    for link_page in dump_page.linkedPages():
        try:
            prefetch(link_page)
            while link_page.isRedirectPage():
                link_page = link_page.getRedirectTarget()
                prefetch(link_page)
            if not link_page.isDisambig():
                continue
        except wikipedia.NoPage:
            continue
        link_page_title = link_page.sectionFreeTitle()
        if link_page_title in disambiglinks.keys():
            continue
        count = refcount(link_page)
        wikipedia.output(u"%s [%i]" % (link_page.title(), count))
        history = u''
        disambiglinks[link_page_title] = {
            'section': 0,  # All new articles go into 'general' until classified
            'title': link_page_title,
            'count': count,
            'history_text': history,
            'trailing_text': u''
        }

    # Step 3.  Sort links by section and count, and output page
    marker = 0
    for (number, (section_name, section_marker, section_end)
         ) in enumerate(sections):
        section_links = [link for link in disambiglinks.values()
                         if link['section'] == number]
        section_links.sort(key=lambda i:i['count'], reverse=True)
        section_count = [0, 0]
        new_text.append(m_text[marker:section_marker])
        active = True
        for link in section_links:
            if link['count'] < ACTIVE_CUTOFF and active:
                active = False
                new_text.append(u"<!-- Inactive articles:\n")
            if link['history_text']:
                history = [int(n) for n in link['history_text'].split(" ")]
            else:
                history = []
            history = [link['count']] + history
            while len(history) > HISTORY_LEN:
                del history[-1]
            if len(history) == 1:
                link['diff'] = 'new'
            else:
                link['diff'] = "%+i" % (history[0] - history[1])
                diffs.append( (history[0]-history[1], link['title']) )
                if history[0] < history[1]:
                    fixed_pages += 1
                    fixed_links += (history[1] - history[0])
            link['history_text'] = " ".join(str(x) for x in history)
##            print link[1]+":", history
            
            if max(history) < ACTIVE_CUTOFF / 4:
                # discard items that have no significant history
                continue

            if active:
                section_count[0] += 1
                section_count[1] += link['count']
                item = (
u"[[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) " +
u"(%(diff)s)<!-- history %(history_text)s--> %(trailing_text)s") % link
                # bullet items that have shown unusual or persistent increases
                if (len(history) > 1 and
                        history[0]-history[1] > ACTIVE_CUTOFF / 2
                   ) or (
                        len(history) == HISTORY_LEN and
                        increasing(history) and
                        history[0] - history[-1] > ACTIVE_CUTOFF
                   ):
                    prefix = "'''&bull; "
                    suffix = "'''"
                    item.rstrip("'")
                    problems.append(
u"* [[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) (%(diff)s)\n"
                        % link)
                else:
                    prefix = suffix = ""
                new_text.append("# %s%s%s\n" % (prefix, item, suffix))
            else:
                total_count[2] += 1
                total_count[3] += link['count']
                new_text.append(
u"# [[%(title)s]] (%(count)i) history %(history_text)s: %(trailing_text)s\n"
                    % link)
        if not active:
            new_text.append("-->\n")
        marker = section_end
        new_text.append(
            u"\n Section '%s' contains %i links to %i active articles.\n" %
            (section_name, section_count[1], section_count[0]))
        total_count[0] += section_count[0]
        total_count[1] += section_count[1]

    diffs.sort()
    statistics_point = m_text.find(u"|}")
    if statistics_point >= 0:
        text = m_text[marker:statistics_point]
        text = re.sub(r"(?s)<!--banner-->.*?<!--/banner-->",
"""<!--banner-->
'''''Since last week, at least %s links to %s pages have been fixed!'''''
<!--/banner-->"""
                          % (fmt(fixed_links), fmt(fixed_pages)), text)
        top10 = ["\n===Top 10 increases==="]
        for item in reversed(diffs[-10:]):
            top10.append("# [[%s]] (%i)" % (item[1], item[0]))
        top10.append("===Top 10 decreases===")
        for item in diffs[:10]:
            top10.append("# [[%s]] (%i)" % (item[1], item[0]))
        top10.append("<!--/banner-->")
        text = text.replace("<!--/banner-->", "\n".join(top10))
        new_text.append(text)
        marker = statistics_point
        new_text.append(u"|-\n")
        today = datetime.date.today()
        new_text.append(u"| %4i-%02i-%02i || %s || %s || %s || %s\n"
                        % (today.year, today.month, today.day,
                           fmt(total_count[0]+total_count[2]),
                           fmt(total_count[0]),
                           fmt(total_count[1]+total_count[3]),
                           fmt(total_count[1])))
        
    new_text.append(m_text[marker:])
    wikipedia.setAction(u"Disambiguation page maintenance script")
    result_page.put(u"".join(new_text))
    prob_text = problem_page.get()
    header_start = prob_text.index("<noinclude>")
    header_end = prob_text.index("</noinclude>") + len("</noinclude>")
    problem_result.put(prob_text[header_start:header_end] + "\n" +
                      u"".join(problems))
    
finally:
    elapsed = datetime.datetime.now() - started_at
    print "elapsed time = " + str(elapsed)
    wikipedia.stopme()