You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
147 lines
4.9 KiB
147 lines
4.9 KiB
import random
|
|
|
|
import dateutil.parser
|
|
import requests
|
|
|
|
from services.instagram import Instagram, InstagramError
|
|
from services.twitch import Twitch, TwitchError
|
|
from services.youtube import Youtube, YoutubeError
|
|
|
|
|
|
class CommandError(Exception):
|
|
pass
|
|
|
|
|
|
class Commands(object):
|
|
def __init__(self, config, logger):
|
|
self.config = config
|
|
self.logger = logger
|
|
self._bellagrams = self._collect_bellagrams()
|
|
|
|
def add_quote(self, id, text, game, date):
|
|
try:
|
|
self._post_quotes(dict(
|
|
id=int(id),
|
|
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
|
|
game=game,
|
|
text=text))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def edit_quote(self, id, text, game, date):
|
|
try:
|
|
self._post_quotes(dict(
|
|
id=int(id),
|
|
date=dateutil.parser.parse(date, dayfirst=True).date().isoformat(),
|
|
game=game,
|
|
text=text))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def remove_quote(self, id):
|
|
try:
|
|
self._delete_quotes(int(id))
|
|
except requests.exceptions.HTTPError as e:
|
|
raise CommandError(e)
|
|
|
|
def get_twitch_messages(self, since):
|
|
if since is None:
|
|
try:
|
|
quotes = self._get_quotes(dict(
|
|
sort_by='id',
|
|
sort_order='desc',
|
|
page_size=1)).json()
|
|
quote = quotes.pop(0)
|
|
except requests.exceptions.HTTPError:
|
|
raise CommandError(e)
|
|
except IndexError:
|
|
raise CommandError(e)
|
|
else:
|
|
since = quote['date']
|
|
api_url = self.config['Twitch'].get('api_url')
|
|
client_id = self.config['Twitch'].get('client_id')
|
|
channel_id = self.config['Twitch'].getint('channel_id')
|
|
since = dateutil.parser.parse(since).date()
|
|
twitch = Twitch(api_url, client_id)
|
|
try:
|
|
return twitch.get_messages(channel_id, since)
|
|
except TwitchError as e:
|
|
raise CommandError(e)
|
|
|
|
def last_quote(self):
|
|
try:
|
|
quotes = self._get_quotes(dict(
|
|
sort_by='id',
|
|
sort_order='desc',
|
|
page_size=1)).json()
|
|
quote = quotes.pop(0)
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no quotes found')
|
|
else:
|
|
return quote
|
|
|
|
def find_quote(self, filter):
|
|
if len(filter) < 3:
|
|
raise CommandError('the search phrase is too short')
|
|
try:
|
|
quotes = self._get_quotes(dict(
|
|
filter=filter,
|
|
sort_order='random',
|
|
page_size=1)).json()
|
|
quote = quotes.pop(0)
|
|
except (requests.exceptions.HTTPError, IndexError):
|
|
raise CommandError('no quotes found')
|
|
else:
|
|
return quote
|
|
|
|
def bellagram(self):
|
|
if not self._bellagrams:
|
|
raise CommandError('couldn\'t get any media from Instagram')
|
|
return random.choice(self._bellagrams)
|
|
|
|
def query_youtube(self, query):
|
|
api_key = self.config['Youtube'].get('api_key')
|
|
channel_ids = self.config['Youtube'].get('channel_ids').split(',')
|
|
yt = Youtube(api_key)
|
|
try:
|
|
result = yt.find_best_match(channel_ids, query)
|
|
except YoutubeError:
|
|
raise CommandError('couldn\'t find anything on Youtube')
|
|
else:
|
|
return result
|
|
|
|
def _get_quotes(self, params):
|
|
api_url = self.config['Quotes'].get('api_url')
|
|
r = requests.get('{0}/quotes'.format(api_url), params=params)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _post_quotes(self, data):
|
|
api_url = self.config['Quotes'].get('api_url')
|
|
api_key = self.config['Quotes'].get('api_key')
|
|
r = requests.post('{0}/quotes'.format(api_url), data=data,
|
|
headers={'X-Quotes-API-Key': api_key})
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _delete_quotes(self, id):
|
|
api_url = self.config['Quotes'].get('api_url')
|
|
api_key = self.config['Quotes'].get('api_key')
|
|
r = requests.delete('{0}/quotes/{1}'.format(api_url, id),
|
|
headers={'X-Quotes-API-Key': api_key})
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def _collect_bellagrams(self):
|
|
username = self.config['Instagram'].get('username')
|
|
keywords = self.config['Instagram'].get('keywords').split(',')
|
|
instagram = Instagram(username)
|
|
try:
|
|
media = instagram.get_media()
|
|
media = [m for m in media if [k for k in keywords \
|
|
if m['type'] == 'Image' and k.lower() in m['title'].lower()]]
|
|
except (InstagramError, IndexError):
|
|
return None
|
|
else:
|
|
return media
|