import datetime import unicodedata import dateutil.parser import requests from services.cheesecom import CheeseCom, CheeseComError from services.converter import Converter, ConverterError from services.exchangerates import ExchangeRates, ExchangeRatesError from services.roll20 import Roll20, Roll20Error from services.youtube import Youtube, YoutubeError INSTAGRAM_BASE_URL = 'https://www.instagram.com' CLIPS_BASE_URL = 'https://clips.twitch.tv' class CommandError(Exception): pass class Commands(object): def __init__(self, config, logger): self.config = config self.logger = logger self.last_roll_formula = None self.rates = ExchangeRates() def add_quote(self, id, text, game, date): try: self._post_quotes(dict( id=int(id), date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(), game=game, text=text)) except requests.exceptions.HTTPError as e: raise CommandError(e) def edit_quote(self, id, text, game, date): try: self._post_quotes(dict( id=int(id), date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(), game=game, text=text)) except requests.exceptions.HTTPError as e: raise CommandError(e) def remove_quote(self, id): try: self._delete_quotes(int(id)) except requests.exceptions.HTTPError as e: raise CommandError(e) def record_regular_sub(self, user, rank, time=None): if time is None: time = datetime.datetime.utcnow() try: self._post_regular_subs(dict( user=user, rank=rank, time=time.isoformat())) except requests.exceptions.HTTPError as e: raise CommandError(e) def record_gifted_sub(self, giver, receiver, time=None): if time is None: time = datetime.datetime.utcnow() try: self._post_gifted_subs(dict( giver=giver, receiver=receiver, time=time.isoformat())) except requests.exceptions.HTTPError as e: raise CommandError(e) def get_quote_messages(self): try: messages = self._get_messages('bellateeny', 'quote') except CommandError: raise else: # quotes before this date can have wrong IDs threshold = datetime.datetime(2018, 5, 11) messages = [(m, t) for m, t in messages if t >= threshold] return messages def get_sub_messages(self): try: messages = self._get_messages('bellateeny', 'cheese hype') messages.extend(self._get_messages('bellateeny', 'gift')) except CommandError: raise else: return messages def plausible_gifted_subs(self, time): try: gifted_subs = self._get_gifted_subs(dict( older_than=time.isoformat(), newer_than=(time - datetime.timedelta(days=31)).isoformat(), sort_by='time', sort_order='desc')).json() except (requests.exceptions.HTTPError, IndexError): raise CommandError('no gifted subs found') else: return gifted_subs def last_quote(self): try: quotes = self._get_quotes(dict( sort_by='id', sort_order='desc', page_size=1)).json() quote = quotes.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no quotes found') else: return quote def find_quote(self, filter): if len(filter) < 3: raise CommandError('the search phrase is too short') try: quotes = self._get_quotes(dict( filter=filter, sort_order='random', page_size=1)).json() quote = quotes.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no quotes found') else: return quote def bellagram(self): keyword = self.config['Instagram'].get('keyword') try: media = self._get_instagram_media(dict( filter=keyword, type='GraphImage', sort_order='random', page_size=1)).json() bellagram = media.pop(0) bellagram['url'] = '{0}/p/{1}'.format(INSTAGRAM_BASE_URL, bellagram['shortcode']) bellagram['owner_url'] = '{0}/{1}'.format(INSTAGRAM_BASE_URL, bellagram['owner_username']) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no bellagrams found') else: return bellagram def query_youtube(self, query): api_key = self.config['Youtube'].get('api_key') channel_ids = self.config['Youtube'].get('channel_ids').split(',') yt = Youtube(api_key) try: result = yt.find_best_match(channel_ids, query) except YoutubeError: raise CommandError('couldn\'t find anything on Youtube') else: return result def find_clip(self, filter): if len(filter) < 3: raise CommandError('the search phrase is too short') try: clips = self._get_clips(dict( filter=filter, sort_order='random', page_size=1)).json() clip = clips.pop(0) clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug']) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no clips found') else: return clip def query_cheese_com(self, query): def remove_accents(s): s = unicodedata.normalize('NFKD', s) return ''.join([c for c in s if not unicodedata.combining(c)]) cc = CheeseCom() try: try: result = cc.query(query) except CheeseComError: result = cc.query(remove_accents(query)) except CheeseComError: raise CommandError('couldn\'t find exact match on cheese.com') else: return result def next_stream(self): try: events = self._get_events(dict( sort_by='start', sort_order='asc', newer_than=datetime.datetime.utcnow().isoformat(), page_size=1)).json() event = events.pop(0) except (requests.exceptions.HTTPError, IndexError): raise CommandError('no events found') else: return event def roll(self, formula, plaintext=False): if not formula: formula = self.last_roll_formula try: result = Roll20.execute(formula, plaintext) except Roll20Error: raise CommandError('failed to interpret or execute the formula') else: self.last_roll_formula = formula return result def convert(self, expression, unit=None): c = Converter() c.update_currencies(*self.rates.query()) try: return c.convert(expression, unit) except ConverterError as e: raise CommandError(str(e)) def _get_instagram_media(self, params): api_url = self.config['Instagram'].get('api_url') r = requests.get('{0}/media'.format(api_url), params=params) r.raise_for_status() return r def _get_quotes(self, params): api_url = self.config['Quotes'].get('api_url') r = requests.get('{0}/quotes'.format(api_url), params=params) r.raise_for_status() return r def _post_quotes(self, data): api_url = self.config['Quotes'].get('api_url') api_key = self.config['Quotes'].get('api_key') r = requests.post('{0}/quotes'.format(api_url), data=data, headers={'X-Quotes-API-Key': api_key}) r.raise_for_status() return r def _delete_quotes(self, id): api_url = self.config['Quotes'].get('api_url') api_key = self.config['Quotes'].get('api_key') r = requests.delete('{0}/quotes/{1}'.format(api_url, id), headers={'X-Quotes-API-Key': api_key}) r.raise_for_status() return r def _get_regular_subs(self, params): api_url = self.config['TwitchSubs'].get('api_url') r = requests.get('{0}/regular-subs'.format(api_url), params=params) r.raise_for_status() return r def _post_regular_subs(self, data): api_url = self.config['TwitchSubs'].get('api_url') api_key = self.config['TwitchSubs'].get('api_key') r = requests.post('{0}/regular-subs'.format(api_url), data=data, headers={'X-Twitch-Subs-API-Key': api_key}) r.raise_for_status() return r def _get_gifted_subs(self, params): api_url = self.config['TwitchSubs'].get('api_url') r = requests.get('{0}/gifted-subs'.format(api_url), params=params) r.raise_for_status() return r def _post_gifted_subs(self, data): api_url = self.config['TwitchSubs'].get('api_url') api_key = self.config['TwitchSubs'].get('api_key') r = requests.post('{0}/gifted-subs'.format(api_url), data=data, headers={'X-Twitch-Subs-API-Key': api_key}) r.raise_for_status() return r def _get_comments(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/search'.format(api_url), params=params) r.raise_for_status() return r def _get_messages(self, commenter, term): def get_time(message): recorded = message.get('video_recorded_at') if not recorded: return None result = dateutil.parser.parse(recorded) result += datetime.timedelta(seconds=float(message.get('offset', 0))) return result try: messages = self._get_comments(dict( commenter=commenter, term=term)).json() except requests.exceptions.HTTPError as e: raise CommandError(e) else: return [(m.get('message_body', ''), get_time(m)) for m in messages] def _get_clips(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/clips'.format(api_url), params=params) r.raise_for_status() return r def _get_events(self, params): api_url = self.config['Twitch'].get('cache_api_url') r = requests.get('{0}/events'.format(api_url), params=params) r.raise_for_status() return r