You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
5.5 KiB

import datetime
import random
import dateutil.parser
import requests
from services.instagram import Instagram, InstagramError
from services.youtube import Youtube, YoutubeError
CLIPS_BASE_URL = 'https://clips.twitch.tv'
class CommandError(Exception):
pass
class Commands(object):
def __init__(self, config, logger):
self.config = config
self.logger = logger
self._bellagrams = self._collect_bellagrams()
def add_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def edit_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def remove_quote(self, id):
try:
self._delete_quotes(int(id))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def get_quote_messages(self):
try:
messages = self._get_messages(dict(
commenter='bellateeny',
term='quote')).json()
except requests.exceptions.HTTPError as e:
raise CommandError(e)
else:
# quotes before this date can have wrong IDs
threshold = datetime.datetime(2018, 5, 11)
messages = [m for m in messages if dateutil.parser.parse(m['created_at']) >= threshold]
return [m.get('message_body', '') for m in messages]
def last_quote(self):
try:
quotes = self._get_quotes(dict(
sort_by='id',
sort_order='desc',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def find_quote(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
quotes = self._get_quotes(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def bellagram(self):
if not self._bellagrams:
raise CommandError('couldn\'t get any media from Instagram')
return random.choice(self._bellagrams)
def query_youtube(self, query):
api_key = self.config['Youtube'].get('api_key')
channel_ids = self.config['Youtube'].get('channel_ids').split(',')
yt = Youtube(api_key)
try:
result = yt.find_best_match(channel_ids, query)
except YoutubeError:
raise CommandError('couldn\'t find anything on Youtube')
else:
return result
def find_clip(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
clips = self._get_clips(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
clip = clips.pop(0)
clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug'])
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no clips found')
else:
return clip
def _get_quotes(self, params):
api_url = self.config['Quotes'].get('api_url')
r = requests.get('{0}/quotes'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_quotes(self, data):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.post('{0}/quotes'.format(api_url), data=data,
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _delete_quotes(self, id):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.delete('{0}/quotes/{1}'.format(api_url, id),
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _get_messages(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/search'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_clips(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/clips'.format(api_url), params=params)
r.raise_for_status()
return r
def _collect_bellagrams(self):
username = self.config['Instagram'].get('username')
keywords = self.config['Instagram'].get('keywords').split(',')
instagram = Instagram(username)
try:
media = instagram.get_media()
media = [m for m in media if [k for k in keywords \
if m['type'] == 'Image' and k.lower() in m['title'].lower()]]
except (InstagramError, TypeError, KeyError):
return None
else:
return media