From 510d2c5115c1507684ea853b9432939d289aabfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Forr=C3=B3?= Date: Wed, 2 Sep 2020 21:58:41 +0200 Subject: [PATCH] Implement Discord bot --- bot.py | 121 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 bot.py diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..8d3a44d --- /dev/null +++ b/bot.py @@ -0,0 +1,121 @@ +#!/usr/bin/python3 + +import asyncio +import configparser +import json +import logging +import pathlib + +import discord +import persistqueue +import tweepy + + +TIMEOUT = 1 + +TWITTER_USER_URL = 'https://twitter.com/i/user/' +TWITTER_STATUS_URL = 'https://twitter.com/i/status/' +TWITTER_ICON_URL = 'https://abs.twimg.com/icons/apple-touch-icon-192x192.png' +TWITTER_COLOR = 0x1da1f2 + + +logger = logging.getLogger('discord') +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +config = configparser.ConfigParser() +config.read('settings.cfg') + + +storage_path = config.get('General', 'storage_path') + +token = config.get('Discord', 'token') +channel_id = config.getint('Discord', 'channel_id') + +consumer_key = config.get('Twitter', 'consumer_key') +consumer_secret = config.get('Twitter', 'consumer_secret') + + +class Bot(discord.Client): + def get_api(self, user_id): + handler = tweepy.OAuthHandler(consumer_key, consumer_secret) + with open(pathlib.Path(storage_path, 'tokens.json'), 'r') as f: + tokens = json.load(f) + access_token = tokens.get(user_id) + if not access_token: + return None + handler.set_access_token(*access_token) + return tweepy.API(handler) + + def make_embed(self, tweet): + tweet_url = '{0}{1}'.format(TWITTER_STATUS_URL, tweet.get('id_str')) + author_url = '{0}{1}'.format(TWITTER_USER_URL, tweet.get('user', {}).get('id_str')) + author_handle = '@{0}'.format(tweet.get('user', {}).get('screen_name')) + description = '{0}\n[{1}]({1})'.format(tweet.get('text'), tweet_url) + embed = discord.Embed( + description=description, + url=tweet_url, + color=TWITTER_COLOR + ) + embed.set_author( + name='{0} ({1})'.format(tweet.get('user', {}).get('name'), author_handle), + url=author_url, + icon_url=tweet.get('user', {}).get('profile_image_url_https') + ) + embed.set_footer( + text='Twitter', + icon_url=TWITTER_ICON_URL + ) + return embed + + async def process_event(self, data): + tweets = data.get('tweet_create_events', []) + if not tweets: + return + user_id = data.get('for_user_id') + if not user_id: + return + api = self.get_api(user_id) + if not api: + return + await self.wait_until_ready() + channel = self.get_channel(channel_id) + if channel: + for tweet in tweets: + await channel.send(embed=self.make_embed(tweet)) + + +def main(): + bot = Bot() + async def poll_queue(): + queue = persistqueue.SQLiteQueue(storage_path) + while True: + try: + data = queue.get(block=False) + except persistqueue.exceptions.Empty: + await asyncio.sleep(TIMEOUT) + else: + try: + await bot.process_event(data) + except Exception: + queue.put(data) + queue.task_done() + raise + async def run_bot(): + while True: + try: + await bot.start(token) + except Exception: + await bot.logout() + loop = asyncio.get_event_loop() + try: + asyncio.ensure_future(poll_queue()) + asyncio.ensure_future(run_bot()) + loop.run_forever() + finally: + loop.close() + + +if __name__ == '__main__': + main()