You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
8.3 KiB
243 lines
8.3 KiB
#!/usr/bin/python3
|
|
|
|
import asyncio
|
|
import configparser
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import re
|
|
|
|
import discord
|
|
import persistqueue
|
|
import tweepy
|
|
|
|
from timer import Timer
|
|
from youtube import YouTube, YouTubeError
|
|
|
|
|
|
COUNTDOWN = [30.5, 5.5]
|
|
|
|
TIMEOUT = 1
|
|
|
|
TWITTER_USER_URL = 'https://twitter.com/i/user/'
|
|
TWITTER_STATUS_URL = 'https://twitter.com/i/status/'
|
|
TWITTER_ICON_URL = 'https://abs.twimg.com/icons/apple-touch-icon-192x192.png'
|
|
TWITTER_COLOR = 0x1da1f2
|
|
|
|
YOUTUBE_COLOR = 0xff0000
|
|
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(os.getenv('CHEDDAR_KNIGHT_CONFIG', 'settings.cfg'))
|
|
|
|
|
|
staging = config.getboolean('General', 'staging', fallback=False)
|
|
debug = config.getboolean('General', 'debug', fallback=False)
|
|
storage_path = config.get('General', 'storage_path')
|
|
|
|
token = config.get('Discord', 'token')
|
|
staging_server_id = config.getint('Discord', 'staging_server_id', fallback=None)
|
|
twitter_channel_id = config.getint('Discord', 'twitter_channel_id')
|
|
youtube_channel_id = config.getint('Discord', 'youtube_channel_id')
|
|
|
|
consumer_key = config.get('Twitter', 'consumer_key')
|
|
consumer_secret = config.get('Twitter', 'consumer_secret')
|
|
|
|
yt_api_key = config.get('YouTube', 'api_key')
|
|
yt_channel_id = config.get('YouTube', 'channel_id')
|
|
|
|
|
|
logger = logging.getLogger('discord')
|
|
logger.addHandler(logging.StreamHandler())
|
|
logger.setLevel(logging.DEBUG if debug else logging.INFO)
|
|
|
|
|
|
class Bot(discord.Client):
|
|
def __init__(self):
|
|
self.commands = (
|
|
re.compile(r'^(?P<prefix>!)(?P<command>reaction)\s+(?P<q>")?(?P<query>.+)(?(q)")$'),
|
|
)
|
|
self.youtube = YouTube(yt_api_key, yt_channel_id)
|
|
self.yt_videos = []
|
|
super().__init__()
|
|
|
|
async def process_event(self, service, data):
|
|
action = getattr(self, 'process_{0}_event'.format(service))
|
|
await action(data)
|
|
|
|
async def on_message(self, message):
|
|
staging_server = message.channel.guild.id == staging_server_id
|
|
if staging and not staging_server or not staging and staging_server:
|
|
return
|
|
for pattern in self.commands:
|
|
m = pattern.match(message.content)
|
|
if not m:
|
|
continue
|
|
d = m.groupdict()
|
|
command = d.pop('command')
|
|
action = getattr(self, 'perform_{0}'.format(command))
|
|
await action(message, **d)
|
|
|
|
def get_twitter_api(self, user_id):
|
|
handler = tweepy.OAuthHandler(consumer_key, consumer_secret)
|
|
with open(pathlib.Path(storage_path, 'tokens.json'), 'r') as f:
|
|
tokens = json.load(f)
|
|
access_token = tokens.get(user_id)
|
|
if not access_token:
|
|
return None
|
|
handler.set_access_token(*access_token)
|
|
return tweepy.API(handler)
|
|
|
|
def make_twitter_embed(self, tweet):
|
|
tweet_url = '{0}{1}'.format(TWITTER_STATUS_URL, tweet.get('id_str'))
|
|
author_url = '{0}{1}'.format(TWITTER_USER_URL, tweet.get('user', {}).get('id_str'))
|
|
author_handle = '@{0}'.format(tweet.get('user', {}).get('screen_name'))
|
|
description = '{0}\n[{1}]({1})'.format(tweet.get('text'), tweet_url)
|
|
embed = discord.Embed(
|
|
description=description,
|
|
url=tweet_url,
|
|
color=TWITTER_COLOR
|
|
)
|
|
embed.set_author(
|
|
name='{0} ({1})'.format(tweet.get('user', {}).get('name'), author_handle),
|
|
url=author_url,
|
|
icon_url=tweet.get('user', {}).get('profile_image_url_https')
|
|
)
|
|
embed.set_footer(
|
|
text='Twitter',
|
|
icon_url=TWITTER_ICON_URL
|
|
)
|
|
return embed
|
|
|
|
async def process_twitter_event(self, data):
|
|
tweets = data.get('tweet_create_events', [])
|
|
if not tweets:
|
|
return
|
|
if 'user_has_blocked' in data:
|
|
# ignore mentions
|
|
return
|
|
user_id = data.get('for_user_id')
|
|
if not user_id:
|
|
return
|
|
api = self.get_twitter_api(user_id)
|
|
if not api:
|
|
return
|
|
await self.wait_until_ready()
|
|
channel = self.get_channel(twitter_channel_id)
|
|
if channel:
|
|
for tweet in tweets:
|
|
await channel.send(embed=self.make_twitter_embed(tweet))
|
|
|
|
def format_remainder(self, seconds):
|
|
minutes = int(seconds / 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
days, hours = divmod(hours, 24)
|
|
result = ['{0} minute{1}'.format(minutes, 's' if minutes != 1 else '')]
|
|
if hours > 0:
|
|
result.append('{0} hour{1}'.format(hours, 's' if hours != 1 else ''))
|
|
if days > 0:
|
|
result.append('{0} day{1}'.format(days, 's' if days != 1 else ''))
|
|
return ', '.join(reversed(result))
|
|
|
|
async def process_youtube_event(self, data):
|
|
import json; print(json.dumps(data)) # FIXME: remove
|
|
entry = data.get('feed', {}).get('entry', {})
|
|
if entry.get('yt:channelId') != yt_channel_id:
|
|
return
|
|
video_id = entry.get('yt:videoId')
|
|
if not video_id:
|
|
return
|
|
try:
|
|
video = self.youtube.get_video(video_id)
|
|
except YouTubeError as e:
|
|
return
|
|
if not video:
|
|
return
|
|
published = entry.get('published')
|
|
reminder = entry.get('reminder')
|
|
if published and not reminder:
|
|
if [v for v, p in self.yt_videos if v == video_id and p == published]:
|
|
return
|
|
self.yt_videos.append((video_id, published))
|
|
channel = self.get_channel(youtube_channel_id)
|
|
if channel:
|
|
note = ''
|
|
if video.get('live_broadcast') == 'upcoming':
|
|
scheduled_start = video.get('scheduled_start')
|
|
if scheduled_start:
|
|
remaining = (scheduled_start - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
|
|
if not reminder:
|
|
for minutes in COUNTDOWN:
|
|
if remaining > minutes * 60:
|
|
payload = data.copy()
|
|
payload['feed']['entry']['reminder'] = '{0}m'.format(minutes)
|
|
Timer.schedule(scheduled_start - datetime.timedelta(minutes=minutes), payload)
|
|
note = 'Live in {0}!\n'.format(self.format_remainder(remaining))
|
|
await channel.send('@everyone {note}**{title}**\n{link}'.format(note=note, **video))
|
|
|
|
def make_youtube_embed(self, playlist):
|
|
embed = discord.Embed(
|
|
title=playlist.get('title'),
|
|
description=playlist.get('description'),
|
|
url=playlist.get('link'),
|
|
color=YOUTUBE_COLOR
|
|
)
|
|
embed.set_thumbnail(url=playlist.get('thumbnail_url'))
|
|
embed.set_author(
|
|
name=self.youtube.channel.get('title'),
|
|
url=self.youtube.channel.get('link'),
|
|
icon_url=self.youtube.channel.get('thumbnail_url')
|
|
)
|
|
return embed
|
|
|
|
async def perform_reaction(self, message, query, **kwargs):
|
|
try:
|
|
result = self.youtube.search(query)
|
|
except YouTubeError as e:
|
|
await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, str(e)))
|
|
return
|
|
if not result:
|
|
await message.channel.send('Sorry {0}, nothing found'.format(message.author.mention))
|
|
return
|
|
if result.get('kind') == 'playlist':
|
|
await message.channel.send(embed=self.make_youtube_embed(result))
|
|
else:
|
|
await message.channel.send('{title}\n{link}'.format(**result))
|
|
|
|
|
|
def main():
|
|
bot = Bot()
|
|
async def poll_queue():
|
|
queue = persistqueue.SQLiteQueue(storage_path)
|
|
while True:
|
|
try:
|
|
data = queue.get(block=False)
|
|
except persistqueue.exceptions.Empty:
|
|
await asyncio.sleep(TIMEOUT)
|
|
else:
|
|
try:
|
|
await bot.process_event(*data)
|
|
except Exception:
|
|
queue.put(data)
|
|
queue.task_done()
|
|
raise
|
|
async def run_bot():
|
|
while True:
|
|
try:
|
|
await bot.start(token)
|
|
except Exception:
|
|
await bot.logout()
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
asyncio.ensure_future(poll_queue())
|
|
asyncio.ensure_future(run_bot())
|
|
loop.run_forever()
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|