You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
6.2 KiB

#!/usr/bin/python3
import asyncio
import configparser
import json
import logging
import pathlib
import re
import discord
import persistqueue
import tweepy
from youtube import YouTube, YouTubeError
TIMEOUT = 1
TWITTER_USER_URL = 'https://twitter.com/i/user/'
TWITTER_STATUS_URL = 'https://twitter.com/i/status/'
TWITTER_ICON_URL = 'https://abs.twimg.com/icons/apple-touch-icon-192x192.png'
TWITTER_COLOR = 0x1da1f2
YOUTUBE_COLOR = 0xff0000
logger = logging.getLogger('discord')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
config = configparser.ConfigParser()
config.read('settings.cfg')
storage_path = config.get('General', 'storage_path')
token = config.get('Discord', 'token')
twitter_channel_id = config.getint('Discord', 'twitter_channel_id')
youtube_channel_id = config.getint('Discord', 'youtube_channel_id')
consumer_key = config.get('Twitter', 'consumer_key')
consumer_secret = config.get('Twitter', 'consumer_secret')
yt_api_key = config.get('YouTube', 'api_key')
yt_channel_id = config.get('YouTube', 'channel_id')
class Bot(discord.Client):
def __init__(self):
self.commands = (
re.compile(r'^(?P<prefix>!)(?P<command>reaction)\s+(?P<q>")?(?P<query>.+)(?(q)")$'),
)
self.youtube = YouTube(yt_api_key, yt_channel_id)
super().__init__()
async def process_event(self, service, data):
action = getattr(self, 'process_{0}_event'.format(service))
await action(data)
async def on_message(self, message):
for pattern in self.commands:
m = pattern.match(message.content)
if not m:
continue
d = m.groupdict()
command = d.pop('command')
action = getattr(self, 'perform_{0}'.format(command))
await action(message, **d)
def get_twitter_api(self, user_id):
handler = tweepy.OAuthHandler(consumer_key, consumer_secret)
with open(pathlib.Path(storage_path, 'tokens.json'), 'r') as f:
tokens = json.load(f)
access_token = tokens.get(user_id)
if not access_token:
return None
handler.set_access_token(*access_token)
return tweepy.API(handler)
def make_twitter_embed(self, tweet):
tweet_url = '{0}{1}'.format(TWITTER_STATUS_URL, tweet.get('id_str'))
author_url = '{0}{1}'.format(TWITTER_USER_URL, tweet.get('user', {}).get('id_str'))
author_handle = '@{0}'.format(tweet.get('user', {}).get('screen_name'))
description = '{0}\n[{1}]({1})'.format(tweet.get('text'), tweet_url)
embed = discord.Embed(
description=description,
url=tweet_url,
color=TWITTER_COLOR
)
embed.set_author(
name='{0} ({1})'.format(tweet.get('user', {}).get('name'), author_handle),
url=author_url,
icon_url=tweet.get('user', {}).get('profile_image_url_https')
)
embed.set_footer(
text='Twitter',
icon_url=TWITTER_ICON_URL
)
return embed
async def process_twitter_event(self, data):
tweets = data.get('tweet_create_events', [])
if not tweets:
return
if 'user_has_blocked' in data:
# ignore mentions
return
user_id = data.get('for_user_id')
if not user_id:
return
api = self.get_twitter_api(user_id)
if not api:
return
await self.wait_until_ready()
channel = self.get_channel(twitter_channel_id)
if channel:
for tweet in tweets:
await channel.send(embed=self.make_twitter_embed(tweet))
async def process_youtube_event(self, data):
entry = data.get('feed', {}).get('entry', {})
if entry.get('yt:channelId') != yt_channel_id:
return
video_id = entry.get('yt:videoId')
if not video_id:
return
try:
video = self.youtube.get_video(video_id)
except YouTubeError as e:
return
channel = self.get_channel(youtube_channel_id)
if channel:
# TODO: setup timer for livestreams
await channel.send('{title}\n{link}'.format(**video))
def make_youtube_embed(self, playlist):
embed = discord.Embed(
title=playlist.get('title'),
description=playlist.get('description'),
url=playlist.get('link'),
color=YOUTUBE_COLOR
)
embed.set_thumbnail(url=playlist.get('thumbnail_url'))
embed.set_author(
name=self.youtube.channel.get('title'),
url=self.youtube.channel.get('link'),
icon_url=self.youtube.channel.get('thumbnail_url')
)
return embed
async def perform_reaction(self, message, query, **kwargs):
try:
result = self.youtube.search(query)
except YouTubeError as e:
await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, str(e)))
return
if not result:
await message.channel.send('Sorry {0}, nothing found'.format(message.author.mention))
return
if result.get('kind') == 'playlist':
await message.channel.send(embed=self.make_youtube_embed(result))
else:
await message.channel.send('{title}\n{link}'.format(**result))
def main():
bot = Bot()
async def poll_queue():
queue = persistqueue.SQLiteQueue(storage_path)
while True:
try:
data = queue.get(block=False)
except persistqueue.exceptions.Empty:
await asyncio.sleep(TIMEOUT)
else:
try:
await bot.process_event(*data)
except Exception:
queue.put(data)
queue.task_done()
raise
async def run_bot():
while True:
try:
await bot.start(token)
except Exception:
await bot.logout()
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(poll_queue())
asyncio.ensure_future(run_bot())
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
main()