|
"""
|
|
Simple script that automates Twitter sharing from a WordPress site.
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from email.mime.text import MIMEText
|
|
from smtplib import SMTP, SMTP_SSL
|
|
from traceback import format_exc
|
|
import argparse
|
|
|
|
import pymysql
|
|
import tweepy # type: ignore
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
# If `test` is True, no tweet nor email is sent.
|
|
test: bool
|
|
# Shared posts do not repeat until reaching this amount.
|
|
repeat_limit: int
|
|
# WordPress MySQL database credentials.
|
|
db_host: str
|
|
db_port: int
|
|
db_user: str
|
|
db_password: str
|
|
db_name: str
|
|
tw_consumer_key: str
|
|
tw_consumer_secret: str
|
|
tw_access_token: str
|
|
tw_access_token_secret: str
|
|
# SMTP credentials to send an email when an error occurs.
|
|
email_to: str
|
|
email_from: str
|
|
email_password: str
|
|
smtp_host: str
|
|
smtp_port: int
|
|
# If `smtp_tls` is False, SSL is used instead.
|
|
smtp_tls: bool
|
|
|
|
|
|
def send_error_email(config: Config) -> None:
|
|
message = format_exc()
|
|
mime_message = MIMEText(message, "plain")
|
|
mime_message["From"] = config.email_from
|
|
mime_message["To"] = config.email_to
|
|
mime_message["Subject"] = "Post Sharing Script Failed"
|
|
if config.smtp_tls:
|
|
smtp = SMTP(config.smtp_host, port=config.smtp_port)
|
|
smtp.starttls()
|
|
else:
|
|
smtp = SMTP_SSL(config.smtp_host, port=config.smtp_port)
|
|
smtp.login(config.email_from, config.email_password)
|
|
if config.test:
|
|
print(mime_message)
|
|
else:
|
|
smtp.sendmail(
|
|
config.email_from,
|
|
config.email_to,
|
|
mime_message.as_string()
|
|
)
|
|
smtp.quit()
|
|
|
|
|
|
def run(config: Config) -> None:
|
|
conn = pymysql.connect(
|
|
host=config.db_host,
|
|
port=config.db_port,
|
|
user=config.db_user,
|
|
passwd=config.db_password,
|
|
db=config.db_name
|
|
)
|
|
cursor = conn.cursor()
|
|
# Create a table where to store the latest shared posts.
|
|
# Necessary to prevent repeats.
|
|
cursor.execute(
|
|
"CREATE TABLE IF NOT EXISTS autoshare_lastshared ("
|
|
" post_id bigint(20)"
|
|
")"
|
|
)
|
|
conn.commit()
|
|
# Get a random post that has not been shared in the last
|
|
# `REPEAT_LIMIT` times. The query is a little complex
|
|
# because posts permalinks are not stored in the database,
|
|
# so they need to be built in the same query.
|
|
# Largely based on https://www.daveheavyindustries.com/2011/02/08/wordpress-permalink-via-sql/.
|
|
cursor.execute("""\
|
|
SELECT
|
|
wpp.ID,
|
|
wpp.post_title,
|
|
CONCAT(
|
|
wpo_su.option_value,
|
|
REPLACE(
|
|
REPLACE(
|
|
REPLACE(
|
|
REPLACE(
|
|
REPLACE(
|
|
wpo.option_value,
|
|
'%year%',
|
|
date_format(wpp.post_date, '%Y')
|
|
),
|
|
'%monthnum%',date_format(wpp.post_date, '%m')
|
|
),
|
|
'%day%', date_format(wpp.post_date, '%d')
|
|
),
|
|
'%postname%', wpp.post_name
|
|
),
|
|
'%category%', wpc.slug
|
|
)
|
|
)
|
|
as permalink
|
|
FROM wp_posts wpp
|
|
INNER JOIN wp_options wpo on wpo.option_name='permalink_structure'
|
|
INNER JOIN wp_options wpo_su on wpo_su.option_name='siteurl'
|
|
INNER JOIN (
|
|
select wtr.object_id ID, max(wpt.slug) slug
|
|
from wp_term_relationships wtr
|
|
inner join wp_term_taxonomy wtt on
|
|
wtt.term_taxonomy_id=wtr.term_taxonomy_id and wtt.taxonomy='category'
|
|
inner join wp_terms wpt on wpt.term_id=wtt.term_id
|
|
group by wtr.object_id
|
|
) wpc on wpc.ID=wpp.ID
|
|
WHERE wpp.post_type = 'post' AND
|
|
wpp.post_status = 'publish' AND
|
|
NOT EXISTS (SELECT 1 FROM autoshare_lastshared shared
|
|
WHERE wpp.ID = shared.post_id)
|
|
ORDER BY RAND()
|
|
LIMIT 1\
|
|
""")
|
|
post = cursor.fetchone()
|
|
# Make sure the posts table is not empty.
|
|
if post is not None:
|
|
post_id: int
|
|
post_title: str
|
|
permalink: str
|
|
post_id, post_title, permalink = post
|
|
if not config.test:
|
|
# Add the selected post to the latest shared posts.
|
|
cursor.execute(
|
|
"INSERT INTO autoshare_lastshared VALUES (%s)",
|
|
post_id
|
|
)
|
|
cursor.execute("SELECT COUNT(*) FROM autoshare_lastshared")
|
|
if (result := cursor.fetchone()) is None:
|
|
raise RuntimeError("SQL query failed.")
|
|
count: int = result[0]
|
|
# Once reached the `config.repeat_limit`, a post that
|
|
# has already been shared might be shared again.
|
|
if count >= config.repeat_limit:
|
|
cursor.execute("TRUNCATE TABLE autoshare_lastshared")
|
|
# Create and send the tweet.
|
|
tweet = f"{post_title} {permalink}"
|
|
auth = tweepy.OAuthHandler(
|
|
config.tw_consumer_key,
|
|
config.tw_consumer_secret
|
|
)
|
|
auth.set_access_token(
|
|
config.tw_access_token,
|
|
config.tw_access_token_secret
|
|
)
|
|
api = tweepy.API(auth)
|
|
if config.test:
|
|
print(tweet)
|
|
else:
|
|
api.update_status(status=tweet)
|
|
conn.commit()
|
|
print("Tweet successfully sent.")
|
|
else:
|
|
print("Nothing to share.")
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("-t", "--test", action="store_true")
|
|
parser.add_argument("--repeat-limit", default=30)
|
|
# Database credentials.
|
|
parser.add_argument("--db-host", default="localhost")
|
|
parser.add_argument("--db-port", default=3306)
|
|
parser.add_argument("--db-user", required=True)
|
|
parser.add_argument("--db-password", required=True)
|
|
parser.add_argument("--db-name", required=True)
|
|
# Twitter API keys.
|
|
parser.add_argument("--tw-consumer-key", required=True)
|
|
parser.add_argument("--tw-consumer-secret", required=True)
|
|
parser.add_argument("--tw-access-token", required=True)
|
|
parser.add_argument("--tw-access-token-secret", required=True)
|
|
# SMTP credentials.
|
|
parser.add_argument("--email-to", required=True)
|
|
parser.add_argument("--email-from", required=True)
|
|
parser.add_argument("--email-password", required=True)
|
|
parser.add_argument("--smtp-host", required=True)
|
|
parser.add_argument("--smtp-port", default=465)
|
|
parser.add_argument("--smtp-tls", action="store_true")
|
|
# This weird initialization is necessary since
|
|
# `parse_args()` does not support a factory class.
|
|
config = Config(
|
|
repeat_limit=0,
|
|
test=False,
|
|
db_host="",
|
|
db_port=0,
|
|
db_user="",
|
|
db_password="",
|
|
db_name="",
|
|
tw_consumer_key="",
|
|
tw_consumer_secret="",
|
|
tw_access_token="",
|
|
tw_access_token_secret="",
|
|
email_to="",
|
|
email_from="",
|
|
email_password="",
|
|
smtp_host="",
|
|
smtp_port=0,
|
|
smtp_tls=False
|
|
)
|
|
parser.parse_args(namespace=config)
|
|
try:
|
|
run(config)
|
|
except Exception:
|
|
send_error_email(config)
|