You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
254 lines
8.7 KiB
254 lines
8.7 KiB
import datetime
|
|
|
|
import dateutil.parser
|
|
import requests
|
|
|
|
from services.youtube import Youtube, YoutubeError
|
|
|
|
|
|
INSTAGRAM_BASE_URL = 'https://www.instagram.com'
|
|
CLIPS_BASE_URL = 'https://clips.twitch.tv'
|
|
|
|
|
|
class CommandError(Exception):
|
|
pass
|
|
|
|
|
|
class Commands(object):
|
|
def __init__(self, config, logger):
|
|
self.config = config
|
|
self.logger = logger
|
|
|
|
def add_quote(self, id, text, game, date):
|
|
try:
|
|
self._post_quotes(dict(
|
|
id=int(id),
|
|
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
|
|
game=game,
|
|
text=text))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def edit_quote(self, id, text, game, date):
|
|
try:
|
|
self._post_quotes(dict(
|
|
id=int(id),
|
|
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
|
|
game=game,
|
|
text=text))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def remove_quote(self, id):
|
|
try:
|
|
self._delete_quotes(int(id))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def record_regular_sub(self, user, rank, time=None):
|
|
if time is None:
|
|
time = datetime.datetime.utcnow()
|
|
try:
|
|
self._post_regular_subs(dict(
|
|
user=user,
|
|
rank=rank,
|
|
time=time.isoformat()))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def record_gifted_sub(self, giver, receiver, time=None):
|
|
if time is None:
|
|
time = datetime.datetime.utcnow()
|
|
try:
|
|
self._post_gifted_subs(dict(
|
|
giver=giver,
|
|
receiver=receiver,
|
|
time=time.isoformat()))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def get_quote_messages(self):
|
|
try:
|
|
messages = self._get_messages('bellateeny', 'quote')
|
|
except CommandError:
|
|
raise
|
|
else:
|
|
# quotes before this date can have wrong IDs
|
|
threshold = datetime.datetime(2018, 5, 11)
|
|
messages = [(m, t) for m, t in messages if t >= threshold]
|
|
return messages
|
|
|
|
def get_sub_messages(self):
|
|
try:
|
|
messages = self._get_messages('bellateeny', 'cheese hype')
|
|
messages.extend(self._get_messages('bellateeny', 'gift'))
|
|
except CommandError:
|
|
raise
|
|
else:
|
|
return messages
|
|
|
|
def plausible_gifted_subs(self, time):
|
|
try:
|
|
gifted_subs = self._get_gifted_subs(dict(
|
|
older_than=time.isoformat(),
|
|
newer_than=(time - datetime.timedelta(days=31)).isoformat(),
|
|
sort_by='time',
|
|
sort_order='desc')).json()
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no gifted subs found')
|
|
else:
|
|
return gifted_subs
|
|
|
|
def last_quote(self):
|
|
try:
|
|
quotes = self._get_quotes(dict(
|
|
sort_by='id',
|
|
sort_order='desc',
|
|
page_size=1)).json()
|
|
quote = quotes.pop(0)
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no quotes found')
|
|
else:
|
|
return quote
|
|
|
|
def find_quote(self, filter):
|
|
if len(filter) < 3:
|
|
raise CommandError('the search phrase is too short')
|
|
try:
|
|
quotes = self._get_quotes(dict(
|
|
filter=filter,
|
|
sort_order='random',
|
|
page_size=1)).json()
|
|
quote = quotes.pop(0)
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no quotes found')
|
|
else:
|
|
return quote
|
|
|
|
def bellagram(self):
|
|
keyword = self.config['Instagram'].get('keyword')
|
|
try:
|
|
media = self._get_instagram_media(dict(
|
|
filter=keyword,
|
|
type='GraphImage',
|
|
sort_order='random',
|
|
page_size=1)).json()
|
|
bellagram = media.pop(0)
|
|
bellagram['url'] = '{0}/p/{1}'.format(INSTAGRAM_BASE_URL, bellagram['shortcode'])
|
|
bellagram['owner_url'] = '{0}/{1}'.format(INSTAGRAM_BASE_URL, bellagram['owner_username'])
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no bellagrams found')
|
|
else:
|
|
return bellagram
|
|
|
|
def query_youtube(self, query):
|
|
api_key = self.config['Youtube'].get('api_key')
|
|
channel_ids = self.config['Youtube'].get('channel_ids').split(',')
|
|
yt = Youtube(api_key)
|
|
try:
|
|
result = yt.find_best_match(channel_ids, query)
|
|
except YoutubeError:
|
|
raise CommandError('couldn\'t find anything on Youtube')
|
|
else:
|
|
return result
|
|
|
|
def find_clip(self, filter):
|
|
if len(filter) < 3:
|
|
raise CommandError('the search phrase is too short')
|
|
try:
|
|
clips = self._get_clips(dict(
|
|
filter=filter,
|
|
sort_order='random',
|
|
page_size=1)).json()
|
|
clip = clips.pop(0)
|
|
clip['url'] = '{0}/{1}'.format(CLIPS_BASE_URL, clip['slug'])
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no clips found')
|
|
else:
|
|
return clip
|
|
|
|
def _get_instagram_media(self, params):
|
|
api_url = self.config['Instagram'].get('api_url')
|
|
r = requests.get('{0}/media'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _get_quotes(self, params):
|
|
api_url = self.config['Quotes'].get('api_url')
|
|
r = requests.get('{0}/quotes'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _post_quotes(self, data):
|
|
api_url = self.config['Quotes'].get('api_url')
|
|
api_key = self.config['Quotes'].get('api_key')
|
|
r = requests.post('{0}/quotes'.format(api_url), data=data,
|
|
headers={'X-Quotes-API-Key': api_key})
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _delete_quotes(self, id):
|
|
api_url = self.config['Quotes'].get('api_url')
|
|
api_key = self.config['Quotes'].get('api_key')
|
|
r = requests.delete('{0}/quotes/{1}'.format(api_url, id),
|
|
headers={'X-Quotes-API-Key': api_key})
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _get_regular_subs(self, params):
|
|
api_url = self.config['TwitchSubs'].get('api_url')
|
|
r = requests.get('{0}/regular-subs'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _post_regular_subs(self, data):
|
|
api_url = self.config['TwitchSubs'].get('api_url')
|
|
api_key = self.config['TwitchSubs'].get('api_key')
|
|
r = requests.post('{0}/regular-subs'.format(api_url), data=data,
|
|
headers={'X-Twitch-Subs-API-Key': api_key})
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _get_gifted_subs(self, params):
|
|
api_url = self.config['TwitchSubs'].get('api_url')
|
|
r = requests.get('{0}/gifted-subs'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _post_gifted_subs(self, data):
|
|
api_url = self.config['TwitchSubs'].get('api_url')
|
|
api_key = self.config['TwitchSubs'].get('api_key')
|
|
r = requests.post('{0}/gifted-subs'.format(api_url), data=data,
|
|
headers={'X-Twitch-Subs-API-Key': api_key})
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _get_comments(self, params):
|
|
api_url = self.config['Twitch'].get('cache_api_url')
|
|
r = requests.get('{0}/search'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _get_messages(self, commenter, term):
|
|
def get_time(message):
|
|
recorded = message.get('video_recorded_at')
|
|
if not recorded:
|
|
return None
|
|
result = dateutil.parser.parse(recorded)
|
|
result += datetime.timedelta(seconds=float(message.get('offset', 0)))
|
|
return result
|
|
try:
|
|
messages = self._get_comments(dict(
|
|
commenter=commenter,
|
|
term=term)).json()
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
else:
|
|
return [(m.get('message_body', ''), get_time(m)) for m in messages]
|
|
|
|
def _get_clips(self, params):
|
|
api_url = self.config['Twitch'].get('cache_api_url')
|
|
r = requests.get('{0}/clips'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|