import datetime import random import dateutil.parser import requests from services.instagram import Instagram, InstagramError from services.youtube import Youtube, YoutubeError CLIPS_BASE_URL = 'https://clips.twitch.tv' class CommandError(Exception): pass class Commands(object): def __init__(self, config, logger): self.config = config self.logger = logger self._bellagrams = self._collect_bellagrams() def add_quote(self, id, text, game, date): try: self._post_quotes(dict( id=int(id), date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(), game=game, text=text)) except requests.exceptions.HTTPError as e: raise CommandError(e) def edit_quote(self, id, text, game, date): try: self._post_quotes(dict( id=int(id), date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(), game=game, text=text)) except requests.exceptions.HTTPError as e: raise CommandError(e) def remove_quote(self, id): try: self._delete_quotes(int(id)) except requests.exceptions.HTTPError as e: raise CommandError(e) def record_gifted_sub(self, giver, receiver, time=None): if time is None: time = datetime.datetime.utcnow() try: self._post_gifted_subs(dict( giver=giver, receiver=receiver, time=time.isoformat())) except requests.exceptions.HTTPError as e: raise CommandError(e) def get_quote_messages(self): try: messages = self._get_messages('bellateeny', 'quote') except CommandError: raise else: # quotes before this date can have wrong IDs threshold = datetime.datetime(2018, 5, 11) messages = [(m, t) for m, t in messages if t >= threshold] return messages def get_gifted_sub_messages(self): try: messages = self._get_messages('bellateeny', 'gift') except CommandError: raise else: return messages def last_quote(self): try: quotes = self._get_quotes(dict( sort_by='id', sort_order='desc', page_size=1)).json() quote = quotes.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no quotes found') else: return quote def find_quote(self, filter): if len(filter) < 3: raise CommandError('the search phrase is too short') try: quotes = self._get_quotes(dict( filter=filter, sort_order='random', page_size=1)).json() quote = quotes.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no quotes found') else: return quote def bellagram(self): if not self._bellagrams: raise CommandError('couldn\'t get any media from Instagram') return random.choice(self._bellagrams) def query_youtube(self, query): api_key = self.config['Youtube'].get('api_key') channel_ids = self.config['Youtube'].get('channel_ids').split(',') yt = Youtube(api_key) try: result = yt.find_best_match(channel_ids, query) except YoutubeError: raise CommandError('couldn\'t find anything on Youtube') else: return result def find_clip(self, filter): if len(filter) < 3: raise CommandError('the search phrase is too short') try: clips = self._get_clips(dict( filter=filter, sort_order='random', page_size=1)).json() clip = clips.pop(0) clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug']) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no clips found') else: return clip def _get_quotes(self, params): api_url = self.config['Quotes'].get('api_url') r = requests.get('{0}/quotes'.format(api_url), params=params) r.raise_for_status() return r def _post_quotes(self, data): api_url = self.config['Quotes'].get('api_url') api_key = self.config['Quotes'].get('api_key') r = requests.post('{0}/quotes'.format(api_url), data=data, headers={'X-Quotes-API-Key': api_key}) r.raise_for_status() return r def _delete_quotes(self, id): api_url = self.config['Quotes'].get('api_url') api_key = self.config['Quotes'].get('api_key') r = requests.delete('{0}/quotes/{1}'.format(api_url, id), headers={'X-Quotes-API-Key': api_key}) r.raise_for_status() # FIXME: reindex subsequent quotes return r def _post_gifted_subs(self, data): api_url = self.config['GiftedSubs'].get('api_url') api_key = self.config['GiftedSubs'].get('api_key') r = requests.post('{0}/gifted-subs'.format(api_url), data=data, headers={'X-Gifted-Subs-API-Key': api_key}) r.raise_for_status() return r def _get_comments(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/search'.format(api_url), params=params) r.raise_for_status() return r def _get_messages(self, commenter, term): def get_time(message): recorded = message.get('video_recorded_at') if not recorded: return None result = dateutil.parser.parse(recorded) result += datetime.timedelta(seconds=float(message.get('offset', 0))) return result try: messages = self._get_comments(dict( commenter=commenter, term=term)).json() except requests.exceptions.HTTPError as e: raise CommandError(e) else: return [(m.get('message_body', ''), get_time(m)) for m in messages] def _get_clips(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/clips'.format(api_url), params=params) r.raise_for_status() return r def _collect_bellagrams(self): username = self.config['Instagram'].get('username') keywords = self.config['Instagram'].get('keywords').split(',') instagram = Instagram(username) try: media = instagram.get_media() media = [m for m in media if [k for k in keywords \ if m['type'] == 'Image' and k.lower() in m['title'].lower()]] except (InstagramError, TypeError, KeyError): return None else: return media