You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
8.5 KiB

import datetime
import random
import dateutil.parser
import requests
from services.instagram import Instagram, InstagramError
from services.youtube import Youtube, YoutubeError
CLIPS_BASE_URL = 'https://clips.twitch.tv'
class CommandError(Exception):
pass
class Commands(object):
def __init__(self, config, logger):
self.config = config
self.logger = logger
self._bellagrams = self._collect_bellagrams()
def add_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def edit_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def remove_quote(self, id):
try:
self._delete_quotes(int(id))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def record_regular_sub(self, user, rank, time=None):
if time is None:
time = datetime.datetime.utcnow()
try:
self._post_regular_subs(dict(
user=user,
rank=rank,
time=time.isoformat()))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def record_gifted_sub(self, giver, receiver, time=None):
if time is None:
time = datetime.datetime.utcnow()
try:
self._post_gifted_subs(dict(
giver=giver,
receiver=receiver,
time=time.isoformat()))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def get_quote_messages(self):
try:
messages = self._get_messages('bellateeny', 'quote')
except CommandError:
raise
else:
# quotes before this date can have wrong IDs
threshold = datetime.datetime(2018, 5, 11)
messages = [(m, t) for m, t in messages if t >= threshold]
return messages
def get_sub_messages(self):
try:
messages = self._get_messages('bellateeny', 'cheese hype')
messages.extend(self._get_messages('bellateeny', 'gift'))
except CommandError:
raise
else:
return messages
def plausible_gifted_subs(self, time):
try:
gifted_subs = self._get_gifted_subs(dict(
older_than=time.isoformat(),
newer_than=(time - datetime.timedelta(days=31)).isoformat(),
sort_by='time',
sort_order='desc')).json()
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no gifted subs found')
else:
return gifted_subs
def last_quote(self):
try:
quotes = self._get_quotes(dict(
sort_by='id',
sort_order='desc',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def find_quote(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
quotes = self._get_quotes(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def bellagram(self):
if not self._bellagrams:
raise CommandError('couldn\'t get any media from Instagram')
return random.choice(self._bellagrams)
def query_youtube(self, query):
api_key = self.config['Youtube'].get('api_key')
channel_ids = self.config['Youtube'].get('channel_ids').split(',')
yt = Youtube(api_key)
try:
result = yt.find_best_match(channel_ids, query)
except YoutubeError:
raise CommandError('couldn\'t find anything on Youtube')
else:
return result
def find_clip(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
clips = self._get_clips(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
clip = clips.pop(0)
clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug'])
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no clips found')
else:
return clip
def _get_quotes(self, params):
api_url = self.config['Quotes'].get('api_url')
r = requests.get('{0}/quotes'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_quotes(self, data):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.post('{0}/quotes'.format(api_url), data=data,
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _delete_quotes(self, id):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.delete('{0}/quotes/{1}'.format(api_url, id),
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _get_regular_subs(self, params):
api_url = self.config['TwitchSubs'].get('api_url')
r = requests.get('{0}/regular-subs'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_regular_subs(self, data):
api_url = self.config['TwitchSubs'].get('api_url')
api_key = self.config['TwitchSubs'].get('api_key')
r = requests.post('{0}/regular-subs'.format(api_url), data=data,
headers={'X-Twitch-Subs-API-Key': api_key})
r.raise_for_status()
return r
def _get_gifted_subs(self, params):
api_url = self.config['TwitchSubs'].get('api_url')
r = requests.get('{0}/gifted-subs'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_gifted_subs(self, data):
api_url = self.config['TwitchSubs'].get('api_url')
api_key = self.config['TwitchSubs'].get('api_key')
r = requests.post('{0}/gifted-subs'.format(api_url), data=data,
headers={'X-Twitch-Subs-API-Key': api_key})
r.raise_for_status()
return r
def _get_comments(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/search'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_messages(self, commenter, term):
def get_time(message):
recorded = message.get('video_recorded_at')
if not recorded:
return None
result = dateutil.parser.parse(recorded)
result += datetime.timedelta(seconds=float(message.get('offset', 0)))
return result
try:
messages = self._get_comments(dict(
commenter=commenter,
term=term)).json()
except requests.exceptions.HTTPError as e:
raise CommandError(e)
else:
return [(m.get('message_body', ''), get_time(m)) for m in messages]
def _get_clips(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/clips'.format(api_url), params=params)
r.raise_for_status()
return r
def _collect_bellagrams(self):
username = self.config['Instagram'].get('username')
keywords = self.config['Instagram'].get('keywords').split(',')
instagram = Instagram(username)
try:
media = instagram.get_media()
media = [m for m in media if [k for k in keywords \
if m['type'] == 'Image' and k.lower() in m['title'].lower()]]
except (InstagramError, TypeError, KeyError):
return None
else:
return media