User:RussBot/dabmaintbot.py
Appearance
#!/usr/bin/python """ dabmaintbot - Bot to update link counts on [[en:Wikipedia:Disambiguation pages maintenance]] """ import datetime import locale import re, sys, traceback import simplejson import urllib import wikipedia, pagegenerators locale.setlocale(locale.LC_ALL, '') s#Constants: ACTIVE_CUTOFF = 100 HISTORY_LEN = 6 import datetime started_at = datetime.datetime.now() # cache page objects to reduce server load pagecache = {} def getPage(title): global pagecache if '#' in title: sf_title = title[:title.index('#')] else: sf_title = title return pagecache.setdefault(sf_title, wikipedia.Page(site, title)) def cacheput(page): global pagecache title = page.sectionFreeTitle() pagecache[title] = page def prefetch(page): while True: try: page.get(get_redirect=True) return except wikipedia.BadTitle: wikipedia.output("Got BadTitle exception on %s; retrying." % page.title()) continue except wikipedia.Error: return def refcount(page): if hasattr(page, "refcount"): return page.refcount data = {'action': 'query', 'generator': 'backlinks', 'gbltitle': page.sectionFreeTitle(), 'gblnamespace': '0', 'gbllimit': '500', 'redirects': 'redirects', 'format': 'json', } count = 0 while True: wikipedia.get_throttle() try: # wikipedia.output("Getting references to %s" % page.aslink()) reflist = site.getUrl(site.api_address(), data=data) except: traceback.print_exc(file=sys.stderr) continue try: result = simplejson.loads(reflist) except ValueError: continue if type(result) is not dict or not result.has_key('query'): return 0 if result['query'].has_key('redirects'): for redirect in result['query']['redirects']: if redirect['to'] == page.sectionFreeTitle(): count += refcount(wikipedia.Page(site, redirect['from'])) if result['query'].has_key('pages'): for ref_id in result['query']['pages']: refpage = result['query']['pages'][ref_id] if refpage['title'] != page.sectionFreeTitle(): count += 1 if result.has_key("query-continue"): data.update(result['query-continue']['backlinks']) else: return count def increasing(seq): '''Return True if seq is uniformly increasing (from last to first), False otherwise''' for index in range( len(seq) - 1 ): if seq[index] <= seq[index+1]: return False return True def fmt(num): return locale.format("%i", num, grouping=True) try: site = wikipedia.getSite() #input pages maint_page = wikipedia.Page(site, "Wikipedia:Disambiguation pages with links/Current list") dump_page = wikipedia.Page(site, "User:RussBot/DPL") problem_page = wikipedia.Page(site, "Wikipedia:Disambiguation pages with links/problems") #output pages result_page = wikipedia.Page(site, "Wikipedia:Disambiguation pages with links/Current list") problem_result = wikipedia.Page(site, "Wikipedia:Disambiguation pages with links/problems") for arg in sys.argv[1:]: arg = wikipedia.argHandler(arg, 'dabmaintbot') if arg: print "Unrecognized command line argument: %s" % arg # show help text and exit wikipedia.argHandler("-help", "dabmaintbot") mylang = site.language() fixed_pages = 0 fixed_links = 0 problems = [] m_text = maint_page.get() active_r = re.compile( r"^# (?:'''• )?\[\[(.+)\]\] *\(([0-9]*) *" + r"\[\[Special:Whatlinkshere/(?:.+)\|links\]\]\) *" + r"(?:\((?:(?:new)|(?:[-+][0-9]+))\))? *" + r"(?:<!-- history (.*?)-->)? *(.*?) *(?:''')? *$", re.M) # the groups matched by this regex are: # 1. the title of a disambiguation page # 2. the number of links found last time the bot ran (may be empty) # 3. the history of the page's link count (may be empty), consisting of a # space-separated string of numbers # 4. any notes added by users at the end of the line inactive_r = re.compile( r'^# \[\[(.+)\]\] \(([0-9]+)\) history ([0-9 ]*):(.*) *$', re.M) # the groups matched by this regex are the same as for active_r # lists are demarcated by HTML comments # Step 1: Collect all links and histories from the last scan start_mark = u"<!-- section title=" end_mark = u"<!-- end section -->" marker = 0 new_text = [] disambiglinks = {} total_count = [0, 0, 0, 0] sections = [] diffs = [] while True: section_start = m_text.find(start_mark, marker) if section_start == -1: break title_mark = section_start + len(start_mark) section_title = m_text[title_mark: m_text.find(u" -->\n", title_mark)] section_marker = title_mark + len(section_title) + len(" -->\n") if section_marker >= len(m_text): wikipedia.output( u"ERROR: cannot locate section title in %s" % section_title) raise RuntimeError section_end = m_text.find(end_mark, section_marker) if section_end == -1: wikipedia.output( u"ERROR: cannot locate end of section %s" % section_title) raise RuntimeError marker = section_end sections.append((section_title, section_marker, section_end)) sectionnumber = len(sections) - 1 for item in active_r.finditer(m_text, section_marker, section_end): link_page_title = item.group(1) link_page = getPage(link_page_title) try: prefetch(link_page) while link_page.isRedirectPage(): link_page = link_page.getRedirectTarget() prefetch(link_page) if not link_page.isDisambig(): continue except wikipedia.NoPage: continue link_page_title = link_page.sectionFreeTitle() if link_page_title in disambiglinks.keys(): continue count = refcount(link_page) wikipedia.output(u"%s [%i]" % (link_page.title(), count)) if item.group(3): history = item.group(3) else: history = u'' disambiglinks[link_page_title] = { 'section': sectionnumber, 'title': link_page_title, 'count': count, 'history_text': history, 'trailing_text': item.group(4).strip() } # search for inactive listings, which should always follow active ones for item in inactive_r.finditer(m_text, section_marker, section_end): link_page_title = item.group(1) link_page = getPage(link_page_title) try: prefetch(link_page) while link_page.isRedirectPage(): link_page = link_page.getRedirectTarget() prefetch(link_page) if not link_page.isDisambig(): continue except wikipedia.NoPage: continue link_page_title = link_page.title() if link_page_title in disambiglinks.keys(): continue count = refcount(link_page) wikipedia.output(u"%s [%i]" % (link_page.title(), count)) if item.group(3): history = item.group(3) else: history = u'' disambiglinks[link_page_title] = { 'section': sectionnumber, 'title': link_page_title, 'count': count, 'history_text': history, 'trailing_text': item.group(4).strip() } # Step 2. Collect links from data dump output page and add any that # aren't already in the collection for link_page in dump_page.linkedPages(): try: prefetch(link_page) while link_page.isRedirectPage(): link_page = link_page.getRedirectTarget() prefetch(link_page) if not link_page.isDisambig(): continue except wikipedia.NoPage: continue link_page_title = link_page.sectionFreeTitle() if link_page_title in disambiglinks.keys(): continue count = refcount(link_page) wikipedia.output(u"%s [%i]" % (link_page.title(), count)) history = u'' disambiglinks[link_page_title] = { 'section': 0, # All new articles go into 'general' until classified 'title': link_page_title, 'count': count, 'history_text': history, 'trailing_text': u'' } # Step 3. Sort links by section and count, and output page marker = 0 for (number, (section_name, section_marker, section_end) ) in enumerate(sections): section_links = [link for link in disambiglinks.values() if link['section'] == number] section_links.sort(key=lambda i:i['count'], reverse=True) section_count = [0, 0] new_text.append(m_text[marker:section_marker]) active = True for link in section_links: if link['count'] < ACTIVE_CUTOFF and active: active = False new_text.append(u"<!-- Inactive articles:\n") if link['history_text']: history = [int(n) for n in link['history_text'].split(" ")] else: history = [] history = [link['count']] + history while len(history) > HISTORY_LEN: del history[-1] if len(history) == 1: link['diff'] = 'new' else: link['diff'] = "%+i" % (history[0] - history[1]) diffs.append( (history[0]-history[1], link['title']) ) if history[0] < history[1]: fixed_pages += 1 fixed_links += (history[1] - history[0]) link['history_text'] = " ".join(str(x) for x in history) ## print link[1]+":", history if max(history) < ACTIVE_CUTOFF / 4: # discard items that have no significant history continue if active: section_count[0] += 1 section_count[1] += link['count'] item = ( u"[[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) " + u"(%(diff)s)<!-- history %(history_text)s--> %(trailing_text)s") % link # bullet items that have shown unusual or persistent increases if (len(history) > 1 and history[0]-history[1] > ACTIVE_CUTOFF / 2 ) or ( len(history) == HISTORY_LEN and increasing(history) and history[0] - history[-1] > ACTIVE_CUTOFF ): prefix = "'''• " suffix = "'''" item.rstrip("'") problems.append( u"* [[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) (%(diff)s)\n" % link) else: prefix = suffix = "" new_text.append("# %s%s%s\n" % (prefix, item, suffix)) else: total_count[2] += 1 total_count[3] += link['count'] new_text.append( u"# [[%(title)s]] (%(count)i) history %(history_text)s: %(trailing_text)s\n" % link) if not active: new_text.append("-->\n") marker = section_end new_text.append( u"\n Section '%s' contains %i links to %i active articles.\n" % (section_name, section_count[1], section_count[0])) total_count[0] += section_count[0] total_count[1] += section_count[1] diffs.sort() statistics_point = m_text.find(u"|}") if statistics_point >= 0: text = m_text[marker:statistics_point] text = re.sub(r"(?s)<!--banner-->.*?<!--/banner-->", """<!--banner--> '''''Since last week, at least %s links to %s pages have been fixed!''''' <!--/banner-->""" % (fmt(fixed_links), fmt(fixed_pages)), text) top10 = ["\n===Top 10 increases==="] for item in reversed(diffs[-10:]): top10.append("# [[%s]] (%i)" % (item[1], item[0])) top10.append("===Top 10 decreases===") for item in diffs[:10]: top10.append("# [[%s]] (%i)" % (item[1], item[0])) top10.append("<!--/banner-->") text = text.replace("<!--/banner-->", "\n".join(top10)) new_text.append(text) marker = statistics_point new_text.append(u"|-\n") today = datetime.date.today() new_text.append(u"| %4i-%02i-%02i || %s || %s || %s || %s\n" % (today.year, today.month, today.day, fmt(total_count[0]+total_count[2]), fmt(total_count[0]), fmt(total_count[1]+total_count[3]), fmt(total_count[1]))) new_text.append(m_text[marker:]) wikipedia.setAction(u"Disambiguation page maintenance script") result_page.put(u"".join(new_text)) prob_text = problem_page.get() header_start = prob_text.index("<noinclude>") header_end = prob_text.index("</noinclude>") + len("</noinclude>") problem_result.put(prob_text[header_start:header_end] + "\n" + u"".join(problems)) finally: elapsed = datetime.datetime.now() - started_at print "elapsed time = " + str(elapsed) wikipedia.stopme()