You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
10 KiB

import datetime
import dateutil.parser
import requests
from services.youtube import Youtube, YoutubeError
from services.cheesecom import CheeseCom, CheeseComError
from services.roll20 import Roll20, Roll20Error
INSTAGRAM_BASE_URL = 'https://www.instagram.com'
CLIPS_BASE_URL = 'https://clips.twitch.tv'
class CommandError(Exception):
pass
class Commands(object):
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.last_roll_formula = None
def add_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def edit_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def remove_quote(self, id):
try:
self._delete_quotes(int(id))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def record_regular_sub(self, user, rank, time=None):
if time is None:
time = datetime.datetime.utcnow()
try:
self._post_regular_subs(dict(
user=user,
rank=rank,
time=time.isoformat()))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def record_gifted_sub(self, giver, receiver, time=None):
if time is None:
time = datetime.datetime.utcnow()
try:
self._post_gifted_subs(dict(
giver=giver,
receiver=receiver,
time=time.isoformat()))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def get_quote_messages(self):
try:
messages = self._get_messages('bellateeny', 'quote')
except CommandError:
raise
else:
# quotes before this date can have wrong IDs
threshold = datetime.datetime(2018, 5, 11)
messages = [(m, t) for m, t in messages if t >= threshold]
return messages
def get_sub_messages(self):
try:
messages = self._get_messages('bellateeny', 'cheese hype')
messages.extend(self._get_messages('bellateeny', 'gift'))
except CommandError:
raise
else:
return messages
def plausible_gifted_subs(self, time):
try:
gifted_subs = self._get_gifted_subs(dict(
older_than=time.isoformat(),
newer_than=(time - datetime.timedelta(days=31)).isoformat(),
sort_by='time',
sort_order='desc')).json()
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no gifted subs found')
else:
return gifted_subs
def last_quote(self):
try:
quotes = self._get_quotes(dict(
sort_by='id',
sort_order='desc',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def find_quote(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
quotes = self._get_quotes(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def bellagram(self):
keyword = self.config['Instagram'].get('keyword')
try:
media = self._get_instagram_media(dict(
filter=keyword,
type='GraphImage',
sort_order='random',
page_size=1)).json()
bellagram = media.pop(0)
bellagram['url'] = '{0}/p/{1}'.format(INSTAGRAM_BASE_URL, bellagram['shortcode'])
bellagram['owner_url'] = '{0}/{1}'.format(INSTAGRAM_BASE_URL, bellagram['owner_username'])
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no bellagrams found')
else:
return bellagram
def query_youtube(self, query):
api_key = self.config['Youtube'].get('api_key')
channel_ids = self.config['Youtube'].get('channel_ids').split(',')
yt = Youtube(api_key)
try:
result = yt.find_best_match(channel_ids, query)
except YoutubeError:
raise CommandError('couldn\'t find anything on Youtube')
else:
return result
def find_clip(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
clips = self._get_clips(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
clip = clips.pop(0)
clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug'])
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no clips found')
else:
return clip
def query_cheese_com(self, query):
cc = CheeseCom()
try:
result = cc.query(query)
except CheeseComError:
raise CommandError('couldn\'t find exact match on cheese.com')
else:
return result
def next_stream(self):
try:
events = self._get_events(dict(
sort_by='start',
sort_order='asc',
newer_than=datetime.datetime.utcnow().isoformat(),
page_size=1)).json()
event = events.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no events found')
else:
return event
def roll(self, formula, plaintext=False):
if not formula:
formula = self.last_roll_formula
try:
result = Roll20.execute(formula, plaintext)
except Roll20Error:
raise CommandError('failed to interpret or execute the formula')
else:
self.last_roll_formula = formula
return result
def _get_instagram_media(self, params):
api_url = self.config['Instagram'].get('api_url')
r = requests.get('{0}/media'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_quotes(self, params):
api_url = self.config['Quotes'].get('api_url')
r = requests.get('{0}/quotes'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_quotes(self, data):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.post('{0}/quotes'.format(api_url), data=data,
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _delete_quotes(self, id):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.delete('{0}/quotes/{1}'.format(api_url, id),
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _get_regular_subs(self, params):
api_url = self.config['TwitchSubs'].get('api_url')
r = requests.get('{0}/regular-subs'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_regular_subs(self, data):
api_url = self.config['TwitchSubs'].get('api_url')
api_key = self.config['TwitchSubs'].get('api_key')
r = requests.post('{0}/regular-subs'.format(api_url), data=data,
headers={'X-Twitch-Subs-API-Key': api_key})
r.raise_for_status()
return r
def _get_gifted_subs(self, params):
api_url = self.config['TwitchSubs'].get('api_url')
r = requests.get('{0}/gifted-subs'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_gifted_subs(self, data):
api_url = self.config['TwitchSubs'].get('api_url')
api_key = self.config['TwitchSubs'].get('api_key')
r = requests.post('{0}/gifted-subs'.format(api_url), data=data,
headers={'X-Twitch-Subs-API-Key': api_key})
r.raise_for_status()
return r
def _get_comments(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/search'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_messages(self, commenter, term):
def get_time(message):
recorded = message.get('video_recorded_at')
if not recorded:
return None
result = dateutil.parser.parse(recorded)
result += datetime.timedelta(seconds=float(message.get('offset', 0)))
return result
try:
messages = self._get_comments(dict(
commenter=commenter,
term=term)).json()
except requests.exceptions.HTTPError as e:
raise CommandError(e)
else:
return [(m.get('message_body', ''), get_time(m)) for m in messages]
def _get_clips(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/clips'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_events(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/events'.format(api_url), params=params)
r.raise_for_status()
return r