import datetime import unicodedata import dateutil.parser import requests from services.youtube import Youtube, YoutubeError from services.cheesecom import CheeseCom, CheeseComError from services.roll20 import Roll20, Roll20Error INSTAGRAM_BASE_URL = 'https://www.instagram.com' CLIPS_BASE_URL = 'https://clips.twitch.tv' class CommandError(Exception): pass class Commands(object): def __init__(self, config, logger): self.config = config self.logger = logger self.last_roll_formula = None def add_quote(self, id, text, game, date): try: self._post_quotes(dict( id=int(id), date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(), game=game, text=text)) except requests.exceptions.HTTPError as e: raise CommandError(e) def edit_quote(self, id, text, game, date): try: self._post_quotes(dict( id=int(id), date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(), game=game, text=text)) except requests.exceptions.HTTPError as e: raise CommandError(e) def remove_quote(self, id): try: self._delete_quotes(int(id)) except requests.exceptions.HTTPError as e: raise CommandError(e) def record_regular_sub(self, user, rank, time=None): if time is None: time = datetime.datetime.utcnow() try: self._post_regular_subs(dict( user=user, rank=rank, time=time.isoformat())) except requests.exceptions.HTTPError as e: raise CommandError(e) def record_gifted_sub(self, giver, receiver, time=None): if time is None: time = datetime.datetime.utcnow() try: self._post_gifted_subs(dict( giver=giver, receiver=receiver, time=time.isoformat())) except requests.exceptions.HTTPError as e: raise CommandError(e) def get_quote_messages(self): try: messages = self._get_messages('bellateeny', 'quote') except CommandError: raise else: # quotes before this date can have wrong IDs threshold = datetime.datetime(2018, 5, 11) messages = [(m, t) for m, t in messages if t >= threshold] return messages def get_sub_messages(self): try: messages = self._get_messages('bellateeny', 'cheese hype') messages.extend(self._get_messages('bellateeny', 'gift')) except CommandError: raise else: return messages def plausible_gifted_subs(self, time): try: gifted_subs = self._get_gifted_subs(dict( older_than=time.isoformat(), newer_than=(time - datetime.timedelta(days=31)).isoformat(), sort_by='time', sort_order='desc')).json() except (requests.exceptions.HTTPError, IndexError): raise CommandError('no gifted subs found') else: return gifted_subs def last_quote(self): try: quotes = self._get_quotes(dict( sort_by='id', sort_order='desc', page_size=1)).json() quote = quotes.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no quotes found') else: return quote def find_quote(self, filter): if len(filter) < 3: raise CommandError('the search phrase is too short') try: quotes = self._get_quotes(dict( filter=filter, sort_order='random', page_size=1)).json() quote = quotes.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no quotes found') else: return quote def bellagram(self): keyword = self.config['Instagram'].get('keyword') try: media = self._get_instagram_media(dict( filter=keyword, type='GraphImage', sort_order='random', page_size=1)).json() bellagram = media.pop(0) bellagram['url'] = '{0}/p/{1}'.format(INSTAGRAM_BASE_URL, bellagram['shortcode']) bellagram['owner_url'] = '{0}/{1}'.format(INSTAGRAM_BASE_URL, bellagram['owner_username']) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no bellagrams found') else: return bellagram def query_youtube(self, query): api_key = self.config['Youtube'].get('api_key') channel_ids = self.config['Youtube'].get('channel_ids').split(',') yt = Youtube(api_key) try: result = yt.find_best_match(channel_ids, query) except YoutubeError: raise CommandError('couldn\'t find anything on Youtube') else: return result def find_clip(self, filter): if len(filter) < 3: raise CommandError('the search phrase is too short') try: clips = self._get_clips(dict( filter=filter, sort_order='random', page_size=1)).json() clip = clips.pop(0) clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug']) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no clips found') else: return clip def query_cheese_com(self, query): def remove_accents(s): s = unicodedata.normalize('NFKD', s) return ''.join([c for c in s if not unicodedata.combining(c)]) cc = CheeseCom() try: try: result = cc.query(query) except CheeseComError: result = cc.query(remove_accents(query)) except CheeseComError: raise CommandError('couldn\'t find exact match on cheese.com') else: return result def next_stream(self): try: events = self._get_events(dict( sort_by='start', sort_order='asc', newer_than=datetime.datetime.utcnow().isoformat(), page_size=1)).json() event = events.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no events found') else: return event def roll(self, formula, plaintext=False): if not formula: formula = self.last_roll_formula try: result = Roll20.execute(formula, plaintext) except Roll20Error: raise CommandError('failed to interpret or execute the formula') else: self.last_roll_formula = formula return result def _get_instagram_media(self, params): api_url = self.config['Instagram'].get('api_url') r = requests.get('{0}/media'.format(api_url), params=params) r.raise_for_status() return r def _get_quotes(self, params): api_url = self.config['Quotes'].get('api_url') r = requests.get('{0}/quotes'.format(api_url), params=params) r.raise_for_status() return r def _post_quotes(self, data): api_url = self.config['Quotes'].get('api_url') api_key = self.config['Quotes'].get('api_key') r = requests.post('{0}/quotes'.format(api_url), data=data, headers={'X-Quotes-API-Key': api_key}) r.raise_for_status() return r def _delete_quotes(self, id): api_url = self.config['Quotes'].get('api_url') api_key = self.config['Quotes'].get('api_key') r = requests.delete('{0}/quotes/{1}'.format(api_url, id), headers={'X-Quotes-API-Key': api_key}) r.raise_for_status() return r def _get_regular_subs(self, params): api_url = self.config['TwitchSubs'].get('api_url') r = requests.get('{0}/regular-subs'.format(api_url), params=params) r.raise_for_status() return r def _post_regular_subs(self, data): api_url = self.config['TwitchSubs'].get('api_url') api_key = self.config['TwitchSubs'].get('api_key') r = requests.post('{0}/regular-subs'.format(api_url), data=data, headers={'X-Twitch-Subs-API-Key': api_key}) r.raise_for_status() return r def _get_gifted_subs(self, params): api_url = self.config['TwitchSubs'].get('api_url') r = requests.get('{0}/gifted-subs'.format(api_url), params=params) r.raise_for_status() return r def _post_gifted_subs(self, data): api_url = self.config['TwitchSubs'].get('api_url') api_key = self.config['TwitchSubs'].get('api_key') r = requests.post('{0}/gifted-subs'.format(api_url), data=data, headers={'X-Twitch-Subs-API-Key': api_key}) r.raise_for_status() return r def _get_comments(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/search'.format(api_url), params=params) r.raise_for_status() return r def _get_messages(self, commenter, term): def get_time(message): recorded = message.get('video_recorded_at') if not recorded: return None result = dateutil.parser.parse(recorded) result += datetime.timedelta(seconds=float(message.get('offset', 0))) return result try: messages = self._get_comments(dict( commenter=commenter, term=term)).json() except requests.exceptions.HTTPError as e: raise CommandError(e) else: return [(m.get('message_body', ''), get_time(m)) for m in messages] def _get_clips(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/clips'.format(api_url), params=params) r.raise_for_status() return r def _get_events(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/events'.format(api_url), params=params) r.raise_for_status() return r