You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
3.4 KiB
122 lines
3.4 KiB
4 years ago
|
#!/usr/bin/python3
|
||
|
|
||
|
import asyncio
|
||
|
import configparser
|
||
|
import json
|
||
|
import logging
|
||
|
import pathlib
|
||
|
|
||
|
import discord
|
||
|
import persistqueue
|
||
|
import tweepy
|
||
|
|
||
|
|
||
|
TIMEOUT = 1
|
||
|
|
||
|
TWITTER_USER_URL = 'https://twitter.com/i/user/'
|
||
|
TWITTER_STATUS_URL = 'https://twitter.com/i/status/'
|
||
|
TWITTER_ICON_URL = 'https://abs.twimg.com/icons/apple-touch-icon-192x192.png'
|
||
|
TWITTER_COLOR = 0x1da1f2
|
||
|
|
||
|
|
||
|
logger = logging.getLogger('discord')
|
||
|
logger.addHandler(logging.StreamHandler())
|
||
|
logger.setLevel(logging.INFO)
|
||
|
|
||
|
|
||
|
config = configparser.ConfigParser()
|
||
|
config.read('settings.cfg')
|
||
|
|
||
|
|
||
|
storage_path = config.get('General', 'storage_path')
|
||
|
|
||
|
token = config.get('Discord', 'token')
|
||
|
channel_id = config.getint('Discord', 'channel_id')
|
||
|
|
||
|
consumer_key = config.get('Twitter', 'consumer_key')
|
||
|
consumer_secret = config.get('Twitter', 'consumer_secret')
|
||
|
|
||
|
|
||
|
class Bot(discord.Client):
|
||
|
def get_api(self, user_id):
|
||
|
handler = tweepy.OAuthHandler(consumer_key, consumer_secret)
|
||
|
with open(pathlib.Path(storage_path, 'tokens.json'), 'r') as f:
|
||
|
tokens = json.load(f)
|
||
|
access_token = tokens.get(user_id)
|
||
|
if not access_token:
|
||
|
return None
|
||
|
handler.set_access_token(*access_token)
|
||
|
return tweepy.API(handler)
|
||
|
|
||
|
def make_embed(self, tweet):
|
||
|
tweet_url = '{0}{1}'.format(TWITTER_STATUS_URL, tweet.get('id_str'))
|
||
|
author_url = '{0}{1}'.format(TWITTER_USER_URL, tweet.get('user', {}).get('id_str'))
|
||
|
author_handle = '@{0}'.format(tweet.get('user', {}).get('screen_name'))
|
||
|
description = '{0}\n[{1}]({1})'.format(tweet.get('text'), tweet_url)
|
||
|
embed = discord.Embed(
|
||
|
description=description,
|
||
|
url=tweet_url,
|
||
|
color=TWITTER_COLOR
|
||
|
)
|
||
|
embed.set_author(
|
||
|
name='{0} ({1})'.format(tweet.get('user', {}).get('name'), author_handle),
|
||
|
url=author_url,
|
||
|
icon_url=tweet.get('user', {}).get('profile_image_url_https')
|
||
|
)
|
||
|
embed.set_footer(
|
||
|
text='Twitter',
|
||
|
icon_url=TWITTER_ICON_URL
|
||
|
)
|
||
|
return embed
|
||
|
|
||
|
async def process_event(self, data):
|
||
|
tweets = data.get('tweet_create_events', [])
|
||
|
if not tweets:
|
||
|
return
|
||
|
user_id = data.get('for_user_id')
|
||
|
if not user_id:
|
||
|
return
|
||
|
api = self.get_api(user_id)
|
||
|
if not api:
|
||
|
return
|
||
|
await self.wait_until_ready()
|
||
|
channel = self.get_channel(channel_id)
|
||
|
if channel:
|
||
|
for tweet in tweets:
|
||
|
await channel.send(embed=self.make_embed(tweet))
|
||
|
|
||
|
|
||
|
def main():
|
||
|
bot = Bot()
|
||
|
async def poll_queue():
|
||
|
queue = persistqueue.SQLiteQueue(storage_path)
|
||
|
while True:
|
||
|
try:
|
||
|
data = queue.get(block=False)
|
||
|
except persistqueue.exceptions.Empty:
|
||
|
await asyncio.sleep(TIMEOUT)
|
||
|
else:
|
||
|
try:
|
||
|
await bot.process_event(data)
|
||
|
except Exception:
|
||
|
queue.put(data)
|
||
|
queue.task_done()
|
||
|
raise
|
||
|
async def run_bot():
|
||
|
while True:
|
||
|
try:
|
||
|
await bot.start(token)
|
||
|
except Exception:
|
||
|
await bot.logout()
|
||
|
loop = asyncio.get_event_loop()
|
||
|
try:
|
||
|
asyncio.ensure_future(poll_queue())
|
||
|
asyncio.ensure_future(run_bot())
|
||
|
loop.run_forever()
|
||
|
finally:
|
||
|
loop.close()
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|