Jump to content

User:InfoboxBot/wikipedia edit pages clean.py

From Wikipedia, the free encyclopedia
This is an old revision of this page, as edited by InfoboxBot (talk | contribs) at 21:33, 4 April 2018 (Exclusion compliance has been added to the bot). The present address (URL) is a permanent link to this revision, which may differ significantly from the current revision.

The following code was developed for Python 3.6. It may not work on previous versions of Python. It is designed to work in conjunction with a pre-existing MySQL database containing scraped infobox data from enwiki, which is populated by several other entirely different scripts, which are not currently open source/available for public viewing (although I may eventually make the whole thing properly & fully open source).

The following is a static snapshot in time of the bot's core code, which is continually evolving and changes to at least a limited extent with every specific task executed. This is complete in that this was the entirety of the program at one point in time for one specific series of edits, but it will never be perfectly current — I will only periodically update this page with new versions of the code, not every single time it changes!

import urllib.request
import urllib
import json
import requests
import time
import glob, os
import mwparserfromhell
import re
import pymysql.cursors
import sys

# Set up session object for all requests
s = requests.Session() 
# NOTE: All cookies received will be stored in the session object

headers = {
	'User-Agent': 'enwiki InfoboxBot power/dam infobox editor by Garzfoth, v1.0b' # UPDATE WITH CHANGES
}

######################################################################################
# ==> REMEMBER TO UPDATE THE DATABASE STRING IF CHANGING THE TEMPLATE SET <==
######################################################################################
database = "infobox_power_station"
database_host = "localhost"
database_user = "" # CONFIDENTIAL (REMOVED)
database_password = "" # CONFIDENTIAL (REMOVED)
database_charset = "utf8mb4"

login_username = "" # CONFIDENTIAL (REMOVED)
login_botpassword = "" # CONFIDENTIAL (REMOVED)

######################################################################################
# ==> REMEMBER TO SET THE KEY / PREVALUE / VALUE / LIMIT <==
######################################################################################
ib_key = "th_fuel_primary"
ib_prevalue = "Natural gas"
ib_value = "[[Natural gas]]"
sql_limit = 5

if sql_limit > 0:
	sql_post = " LIMIT "+str(sql_limit)
else:
	sql_post = ""

connection = pymysql.connect(host=database_host,
							user=database_user,
							password=database_password,
							db=database,
							charset=database_charset,
							cursorclass=pymysql.cursors.DictCursor)
try:
	with connection.cursor() as cursor:
		# Execute SQL
		######################################################################################
		# ==> REMEMBER TO UPDATE THE SQL QUERY IF NECESSARY <==
		######################################################################################
		sql = "SELECT pageid, title FROM `data` WHERE `key` = '"+ib_key+"' AND `value` = '"+ib_prevalue+"'"+sql_post
		cursor.execute(sql)
		result = cursor.fetchall()
finally:
	connection.close()

######################################################################################
# ==> REMEMBER TO UPDATE THE EDIT SUMMARY IF NECESSARY <==
######################################################################################
editsummary = "Automated edit: fixing infobox parameter \""+ib_key+"\""


print("----------")
def print_summary():
	print("database: "+database)
	print("SQL: "+sql)
	print("ib_key: "+ib_key)
	print("ib_prevalue: "+ib_prevalue)
	print("ib_value: "+ib_value)
	print("sql_limit: "+str(sql_limit))
	print("Transformation: "+ib_prevalue+" => "+ib_value)
	print("Edit summary: "+editsummary)
print_summary()
print("----------")
print("If the above values are incorrect, please press Ctrl+C within 10 seconds...")
'''n = 10 # seconds
n = n * 2 # half-second increments
for i in range(n):
	sys.stdout.write('\r')
	j = (i + 1) / n
	sys.stdout.write("[%-20s] %d%%" % ('='*int(20*j), 100*j))
	sys.stdout.flush()
	time.sleep(0.5) # half-second increments
sys.stdout.write("\n")
sys.stdout.flush()'''

no = 10 # seconds
increments = 0.5 # half-second increments
nf = no * (1 / increments)
n = int(nf)
for i in range(1, n):
	sys.stdout.write('\r')
	j = (i + 1) / n
	remaining = no - (i * increments)
	sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*int(20*j), remaining, 100*j))
	sys.stdout.flush()
	time.sleep(increments)
sys.stdout.write('\r')
sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*20, 0.00, 100))
sys.stdout.write("\n")
sys.stdout.flush()
print("Main program starting...")
print("----------")

# Start login process
# Acquire login token
query = {
	"action": "query",
	"format": "json",
	"meta": "tokens",
	"type": "login"
}
encodedquery = urllib.parse.urlencode(query)
baseurl = "https://en.wikipedia.org/w/api.php?"
#print(baseurl+encodedquery)
login1 = s.get(baseurl+encodedquery, headers=headers)
print("Login #1 (login token): " + login1.json()["query"]["tokens"]["logintoken"])
# login1["token"]
# Log in
query = {
	"action": "login",
	"format": "json",
	"lgname": login_username
}
querypost = {
	"action": "login",
	"format": "json",
	"lgname": login_username,
	"lgpassword": login_botpassword,
	"lgtoken": login1.json()["query"]["tokens"]["logintoken"]
}
encodedquery = urllib.parse.urlencode(query)
#print(baseurl+encodedquery)
login2 = s.post("https://en.wikipedia.org/w/api.php", data=querypost, headers=headers)
#print("Login #2: " + login2.json()["login"]["result"])
print(login2.json())
print("----------")

def allow_bots(text, user):
	user = user.lower().strip()
	text = mwparserfromhell.parse(text)
	for tl in text.filter_templates():
		if tl.name.matches(['bots', 'nobots']):
			break
	else:
		return True
	for param in tl.params:
		bots = [x.lower().strip() for x in param.value.split(",")]
		if param.name == 'allow':
			if ''.join(bots) == 'none': return False
			for bot in bots:
				if bot in (user, 'all'):
					return True
		elif param.name == 'deny':
			if ''.join(bots) == 'none': return True
			for bot in bots:
				if bot in (user, 'all'):
					return False
	if (tl.name.matches('nobots') and len(tl.params) == 0):
		return False
	return True

iteration = 0
result_count = len(result)

for item in result:
	iteration = iteration + 1
	print("["+str(iteration)+"/"+str(result_count)+"]: " + item["title"] + " - " + str(item["pageid"]))
	# Acquire content/timestamp for page to edit
	query = {
		"action": "query",
		"format": "json",
		"curtimestamp": 1,
		"prop": "revisions",
		"pageids": item["pageid"],
		"rvprop": "content|timestamp"
	}
	encodedquery = urllib.parse.urlencode(query)
	#print(baseurl+encodedquery)
	response = s.get(baseurl+encodedquery, headers=headers)
	if allow_bots(response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["*"], "InfoboxBot"):
		print("Bot allowed!")
		wikicode = mwparserfromhell.parse(response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["*"])
		templates = wikicode.filter_templates()
		#tpl = next(x for x in templates if x.startswith("{{Infobox power station") or x.startswith("{{infobox power station") or x.startswith("{{Infobox power plant") or x.startswith("{{infobox power plant") or x.startswith("{{Infobox wind farm") or x.startswith("{{infobox wind farm") or x.startswith("{{Infobox nuclear power station") or x.startswith("{{infobox nuclear power station"))
		######################################################################################
		# ==> REMEMBER TO SET THE TEMPLATE SET BEING USED <==
		######################################################################################
		tpl = next(x for x in templates if x.name.matches(["Infobox power station", "Infobox power plant", "Infobox wind farm", "Infobox nuclear power station"]))
		#tpl = next(x for x in templates if x.name.matches(["Infobox dam", "Infobox hydroelectric power station"]))
		#length = len(tpl.params)
		#print("Params found: "+str(length))
		tpl.add(ib_key, ib_value)
		#print(str(wikicode))
		# response.json()["curtimestamp"]
		# response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["timestamp"]
		# Obtain CSRF token
		query = {
			"action": "query",
			"format": "json",
			"meta": "tokens",
			"type": "csrf"
		}
		encodedquery = urllib.parse.urlencode(query)
		csrf = s.get(baseurl+encodedquery, headers=headers)
		# Make the edit (but first, some useful comments)
		# https://www.mediawiki.org/wiki/API:Assert
		# assert = "user" / "bot" ==> request will fail if user is not logged in / if user is not a bot
		# https://www.mediawiki.org/wiki/API:Edit
		# summary = "edit summary"
		# minor = 1 ==> marks the edit as minor
		# notminor = 1 ==> "If set, don't mark the edit as minor, even if you have the "Mark all my edits minor by default" preference enabled"
		# bot = 1 ==> marks the edit as bot-made
		querypost = {
			"action": "edit",
			"assert": "user",
			"format": "json",
			"pageid": item["pageid"],
			"text": str(wikicode),
			"summary": editsummary,
			"minor": 1,
			"basetimestamp": response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["timestamp"],
			"starttimestamp": response.json()["curtimestamp"],
			"nocreate": 1,
			"watchlist": "nochange",
			"token": csrf.json()["query"]["tokens"]["csrftoken"]
		}
		finalresult = s.post("https://en.wikipedia.org/w/api.php", data=querypost, headers=headers)
		print(finalresult)
		# Connect to the database
		connection = pymysql.connect(host=database_host,
									user=database_user,
									password=database_password,
									db=database,
									charset=database_charset,
									cursorclass=pymysql.cursors.DictCursor)
		try:
			with connection.cursor() as cursor:
				# Update the record
				sql = "UPDATE `data` SET `value` = %s WHERE `pageid` = %s AND `title` = %s AND `key` = %s"
				cursor.execute(sql, (ib_value, item["pageid"], item["title"], ib_key))
			# connection is not autocommit by default. So you must commit to save your changes.
			connection.commit()
		finally:
			connection.close()
		print("Updated in DB.")
		if iteration != result_count:
			# Let's not kill the Wikipedia API
			#time.sleep(10)
			no = 10 # seconds
			increments = 0.5 # half-second increments
			nf = no * (1 / increments)
			n = int(nf)
			for i in range(1, n):
				sys.stdout.write('\r')
				j = (i + 1) / n
				remaining = no - (i * increments)
				sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*int(20*j), remaining, 100*j))
				sys.stdout.flush()
				time.sleep(increments)
			sys.stdout.write('\r')
			sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*20, 0.00, 100))
			sys.stdout.write("\n")
			sys.stdout.flush()
	else:
		print("Bot not allowed!")
print("----------")
print("Done!")
print("----------")
print_summary()
print("----------")