Implement !reaction command

master
Nikola Forró 4 years ago
parent 63ac656eb5
commit 2a7e910d46

@ -5,11 +5,14 @@ import configparser
import json
import logging
import pathlib
import re
import discord
import persistqueue
import tweepy
from youtube import YouTube, YouTubeError
TIMEOUT = 1
@ -18,6 +21,8 @@ TWITTER_STATUS_URL = 'https://twitter.com/i/status/'
TWITTER_ICON_URL = 'https://abs.twimg.com/icons/apple-touch-icon-192x192.png'
TWITTER_COLOR = 0x1da1f2
YOUTUBE_COLOR = 0xff0000
logger = logging.getLogger('discord')
logger.addHandler(logging.StreamHandler())
@ -36,8 +41,18 @@ channel_id = config.getint('Discord', 'channel_id')
consumer_key = config.get('Twitter', 'consumer_key')
consumer_secret = config.get('Twitter', 'consumer_secret')
yt_api_key = config.get('YouTube', 'api_key')
yt_channel_id = config.get('YouTube', 'channel_id')
class Bot(discord.Client):
def __init__(self):
self.commands = (
re.compile(r'^(?P<prefix>!)(?P<command>reaction)\s+(?P<q>")?(?P<query>.+)(?(q)")$'),
)
self.youtube = YouTube(yt_api_key, yt_channel_id)
super().__init__()
def get_api(self, user_id):
handler = tweepy.OAuthHandler(consumer_key, consumer_secret)
with open(pathlib.Path(storage_path, 'tokens.json'), 'r') as f:
@ -88,6 +103,35 @@ class Bot(discord.Client):
for tweet in tweets:
await channel.send(embed=self.make_embed(tweet))
async def on_message(self, message):
for pattern in self.commands:
m = pattern.match(message.content)
if not m:
continue
d = m.groupdict()
command = d.pop('command')
action = getattr(self, 'perform_{0}'.format(command))
await action(message, **d)
async def perform_reaction(self, message, query, **kwargs):
try:
result = self.youtube.search(query)
except YouTubeError as e:
await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, str(e)))
return
if not result:
await message.channel.send('Sorry {0}, nothing found'.format(message.author.mention))
return
if result.get('kind') == 'playlist':
embed = discord.Embed(title=result.get('title'), url=result.get('link'),
description=result.get('description'), color=YOUTUBE_COLOR)
embed.set_thumbnail(url=result.get('thumbnail_url'))
embed.set_author(name=self.youtube.channel.get('title'), url=self.youtube.channel.get('link'),
icon_url=self.youtube.channel.get('thumbnail_url'))
await message.channel.send(embed=embed)
else:
await message.channel.send('{title}\n{link}'.format(**result))
def main():
bot = Bot()

@ -0,0 +1,120 @@
import html
import fuzzywuzzy.fuzz
import fuzzywuzzy.process
import googleapiclient
import googleapiclient.discovery
BASE_URL = 'https://www.youtube.com'
class YouTube(object):
def __init__(self, api_key, channel_id):
self.client = googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key)
self.channel = self.get_channel(channel_id)
def get_thumbnail_url(self, thumbnails):
for key in ('high', 'medium', 'default'):
if key in thumbnails:
return thumbnails[key].get('url')
def process_items(self, items):
result = []
for item in items:
id = item.get('id', '')
kind = item.get('kind', 'youtube#').split('youtube#')[1]
if kind == 'searchResult':
id = item.get('id', {}).get('videoId', '')
kind = item.get('id', {}).get('kind', 'youtube#').split('youtube#')[1]
if kind == 'playlist':
link = '{0}/view_play_list?p={1}'.format(BASE_URL, id)
else:
link = '{0}/watch?v={1}'.format(BASE_URL, id)
result.append(dict(
kind=kind,
link=link,
title=html.unescape(item.get('snippet', {}).get('title', '')),
description=html.unescape(item.get('snippet', {}).get('description', '')),
thumbnail_url=self.get_thumbnail_url(item.get('snippet', {}).get('thumbnails', {}))
))
return result
def get_channel(self, channel_id):
r = self.client.channels().list(id=channel_id, maxResults=1, part='id,snippet').execute()
channel = r.get('items', [{}]).pop()
return dict(
id=channel.get('id', ''),
link='{0}/c/{1}'.format(BASE_URL, channel.get('snippet', {}).get('customUrl', '')),
title=html.unescape(channel.get('snippet', {}).get('title', '')),
thumbnail_url=self.get_thumbnail_url(channel.get('snippet', {}).get('thumbnails', {}))
)
def get_playlists(self):
token = ''
result = []
while True:
r = self.client.playlists().list(
channelId=self.channel.get('id'),
maxResults=50,
part='id,snippet',
pageToken=token
).execute()
result.extend(self.process_items(r.get('items', [])))
token = r.get('nextPageToken')
if not token:
break
return result
def search_videos(self, query, limit):
count = limit
token = ''
result = []
while True:
r = self.client.search().list(
channelId=self.channel.get('id'),
q=query,
safeSearch='none',
type='video',
maxResults=min(count, 50),
part='id,snippet',
pageToken=token
).execute()
result.extend(self.process_items(r.get('items', [])))
count -= r.get('pageInfo', {}).get('resultsPerPage', 0)
if count <= 0:
break
token = r.get('nextPageToken')
if not token:
break
return result
def search(self, query):
try:
results = self.get_playlists()
results.extend(self.search_videos(query, limit=5))
except googleapiclient.errors.HttpError as e:
raise YouTubeError(str(e))
if not results:
return None
tokens = [t for t in query.split('|') if not t.strip().startswith('-')] or ['']
matches = []
for token in tokens:
titles = {i: r.get('title') for i, r in enumerate(results)}
descriptions = {i: r.get('description') for i, r in enumerate(results)}
match = fuzzywuzzy.process.extractOne(token, titles,
scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25)
if match:
matches.append(match)
match = fuzzywuzzy.process.extractOne(token, descriptions,
scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25)
if match:
matches.append(match)
if not matches:
return None
_, _, i = sorted(matches, key=lambda m: m[1], reverse=True)[0]
return results[i]
class YouTubeError(Exception):
pass
Loading…
Cancel
Save