You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

292 lines
9.9 KiB

6 years ago
import datetime
import dateutil.parser
import requests
from services.youtube import Youtube, YoutubeError
from services.cheesecom import CheeseCom, CheeseComError
from services.roll20 import Roll20, Roll20Error
INSTAGRAM_BASE_URL = 'https://www.instagram.com'
CLIPS_BASE_URL = 'https://clips.twitch.tv'
class CommandError(Exception):
pass
class Commands(object):
def __init__(self, config, logger):
self.config = config
self.logger = logger
def add_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def edit_quote(self, id, text, game, date):
try:
self._post_quotes(dict(
id=int(id),
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
game=game,
text=text))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def remove_quote(self, id):
try:
self._delete_quotes(int(id))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def record_regular_sub(self, user, rank, time=None):
if time is None:
time = datetime.datetime.utcnow()
try:
self._post_regular_subs(dict(
user=user,
rank=rank,
time=time.isoformat()))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def record_gifted_sub(self, giver, receiver, time=None):
if time is None:
time = datetime.datetime.utcnow()
try:
self._post_gifted_subs(dict(
giver=giver,
receiver=receiver,
time=time.isoformat()))
except requests.exceptions.HTTPError as e:
raise CommandError(e)
def get_quote_messages(self):
try:
messages = self._get_messages('bellateeny', 'quote')
except CommandError:
raise
else:
6 years ago
# quotes before this date can have wrong IDs
threshold = datetime.datetime(2018, 5, 11)
messages = [(m, t) for m, t in messages if t >= threshold]
return messages
def get_sub_messages(self):
try:
messages = self._get_messages('bellateeny', 'cheese hype')
messages.extend(self._get_messages('bellateeny', 'gift'))
except CommandError:
raise
else:
return messages
def plausible_gifted_subs(self, time):
try:
gifted_subs = self._get_gifted_subs(dict(
older_than=time.isoformat(),
newer_than=(time - datetime.timedelta(days=31)).isoformat(),
sort_by='time',
sort_order='desc')).json()
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no gifted subs found')
else:
return gifted_subs
def last_quote(self):
try:
quotes = self._get_quotes(dict(
sort_by='id',
sort_order='desc',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def find_quote(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
quotes = self._get_quotes(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
quote = quotes.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no quotes found')
else:
return quote
def bellagram(self):
keyword = self.config['Instagram'].get('keyword')
try:
media = self._get_instagram_media(dict(
filter=keyword,
type='GraphImage',
sort_order='random',
page_size=1)).json()
bellagram = media.pop(0)
bellagram['url'] = '{0}/p/{1}'.format(INSTAGRAM_BASE_URL, bellagram['shortcode'])
bellagram['owner_url'] = '{0}/{1}'.format(INSTAGRAM_BASE_URL, bellagram['owner_username'])
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no bellagrams found')
else:
return bellagram
def query_youtube(self, query):
api_key = self.config['Youtube'].get('api_key')
channel_ids = self.config['Youtube'].get('channel_ids').split(',')
yt = Youtube(api_key)
try:
result = yt.find_best_match(channel_ids, query)
except YoutubeError:
raise CommandError('couldn\'t find anything on Youtube')
else:
return result
def find_clip(self, filter):
if len(filter) < 3:
raise CommandError('the search phrase is too short')
try:
clips = self._get_clips(dict(
filter=filter,
sort_order='random',
page_size=1)).json()
clip = clips.pop(0)
clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug'])
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no clips found')
else:
return clip
def query_cheese_com(self, query):
cc = CheeseCom()
try:
result = cc.query(query)
except CheeseComError:
raise CommandError('couldn\'t find exact match on cheese.com')
else:
return result
def next_stream(self):
try:
events = self._get_events(dict(
sort_by='start',
sort_order='asc',
newer_than=datetime.datetime.utcnow().isoformat(),
page_size=1)).json()
event = events.pop(0)
except (requests.exceptions.HTTPError, IndexError):
raise CommandError('no events found')
else:
return event
def roll(self, formula):
try:
result = Roll20.execute(formula)
except Roll20Error:
raise CommandError('failed to interpret the formula')
else:
return result
def _get_instagram_media(self, params):
api_url = self.config['Instagram'].get('api_url')
r = requests.get('{0}/media'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_quotes(self, params):
api_url = self.config['Quotes'].get('api_url')
r = requests.get('{0}/quotes'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_quotes(self, data):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.post('{0}/quotes'.format(api_url), data=data,
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _delete_quotes(self, id):
api_url = self.config['Quotes'].get('api_url')
api_key = self.config['Quotes'].get('api_key')
r = requests.delete('{0}/quotes/{1}'.format(api_url, id),
headers={'X-Quotes-API-Key': api_key})
r.raise_for_status()
return r
def _get_regular_subs(self, params):
api_url = self.config['TwitchSubs'].get('api_url')
r = requests.get('{0}/regular-subs'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_regular_subs(self, data):
api_url = self.config['TwitchSubs'].get('api_url')
api_key = self.config['TwitchSubs'].get('api_key')
r = requests.post('{0}/regular-subs'.format(api_url), data=data,
headers={'X-Twitch-Subs-API-Key': api_key})
r.raise_for_status()
return r
def _get_gifted_subs(self, params):
api_url = self.config['TwitchSubs'].get('api_url')
r = requests.get('{0}/gifted-subs'.format(api_url), params=params)
r.raise_for_status()
return r
def _post_gifted_subs(self, data):
api_url = self.config['TwitchSubs'].get('api_url')
api_key = self.config['TwitchSubs'].get('api_key')
r = requests.post('{0}/gifted-subs'.format(api_url), data=data,
headers={'X-Twitch-Subs-API-Key': api_key})
r.raise_for_status()
return r
def _get_comments(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/search'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_messages(self, commenter, term):
def get_time(message):
recorded = message.get('video_recorded_at')
if not recorded:
return None
result = dateutil.parser.parse(recorded)
result += datetime.timedelta(seconds=float(message.get('offset', 0)))
return result
try:
messages = self._get_comments(dict(
commenter=commenter,
term=term)).json()
except requests.exceptions.HTTPError as e:
raise CommandError(e)
else:
return [(m.get('message_body', ''), get_time(m)) for m in messages]
def _get_clips(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/clips'.format(api_url), params=params)
r.raise_for_status()
return r
def _get_events(self, params):
api_url = self.config['Twitch'].get('cache_api_url')
r = requests.get('{0}/events'.format(api_url), params=params)
r.raise_for_status()
return r