You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
292 lines
12 KiB
292 lines
12 KiB
import functools
|
|
import re
|
|
import unicodedata
|
|
|
|
import irc.bot
|
|
|
|
from commands import CommandError
|
|
|
|
|
|
# $username --> Sweet! Thanks for the quote! #$id: $response
|
|
QUOTE_ADDED_PATTERN = re.compile(r'''^
|
|
(?P<user>.+)\s+-->\s+
|
|
Sweet!\s+Thanks\s+for\s+the\s+quote!\s+
|
|
\#(?P<id>\d+):\s+
|
|
(?P<text>.+)\s+
|
|
\[(?P<game>.+)\]\s+
|
|
\[(?P<date>.+)\]$''', re.VERBOSE)
|
|
|
|
# $username --> Successfully edited Quote #$id: $response
|
|
QUOTE_EDITED_PATTERN = re.compile(r'''^
|
|
(?P<user>.+)\s+-->\s+
|
|
Successfully\s+edited\s+Quote\s+
|
|
\#(?P<id>\d+):\s+
|
|
(?P<text>.+)\s+
|
|
\[(?P<game>.+)\]\s+
|
|
\[(?P<date>.+)\]$''', re.VERBOSE)
|
|
|
|
# $username --> Successfully deleted Quote #$id.
|
|
QUOTE_REMOVED_PATTERN = re.compile(r'''^
|
|
(?P<user>.+)\s+-->\s+
|
|
Successfully\s+deleted\s+Quote\s+
|
|
\#(?P<id>\d+)\.$''', re.VERBOSE)
|
|
|
|
# $user has joined the Cheese Horde! CHEESE HYPE!!! ♥♥
|
|
SUB_PATTERN = re.compile(r'''^
|
|
(?P<user>.+)\s+
|
|
has\s+joined\s+the\s+Cheese\s+Horde!\s+
|
|
CHEESE\s+HYPE!!!\s+♥♥$''', re.VERBOSE)
|
|
|
|
# $user, Thank you for the $rank months of cheesy support! ♥♥♥♥ CHEESE HYPE!!!
|
|
RESUB_PATTERN = re.compile(r'''^
|
|
(?P<user>.+),\s+
|
|
Thank\s+you\s+for\s+the\s+
|
|
(?P<rank>\d+)\s+
|
|
months\s+of\s+cheesy\s+support!\s+
|
|
♥♥♥♥\s+CHEESE\s+HYPE!!!$''', re.VERBOSE)
|
|
|
|
# $giver, Thank you for gifting a sub to $receiver! So kind <3 !
|
|
SUB_GIFTED_PATTERN = re.compile(r'''^
|
|
(?P<giver>.+),\s+
|
|
Thank\s+you\s+for\s+gifting\s+a\s+sub\s+to\s+
|
|
(?P<receiver>.+)!\s+
|
|
So\s+kind\s+<3\s+!$''', re.VERBOSE)
|
|
|
|
|
|
class TwitchClient(irc.bot.SingleServerIRCBot):
|
|
def __init__(self, config, logger, commands, extra_commands):
|
|
self.config = config
|
|
self.logger = logger
|
|
self.commands = commands
|
|
self.extra_commands = extra_commands
|
|
self.patterns = [
|
|
(QUOTE_ADDED_PATTERN, self._add_quote),
|
|
(QUOTE_EDITED_PATTERN, self._edit_quote),
|
|
(QUOTE_REMOVED_PATTERN, self._remove_quote),
|
|
(SUB_PATTERN, self._record_sub),
|
|
(RESUB_PATTERN, self._record_resub),
|
|
(SUB_GIFTED_PATTERN, self._record_gifted_sub),
|
|
]
|
|
self.supported_commands = [
|
|
(re.compile(r'^!lastquote$'), self._do_lastquote),
|
|
(re.compile(r'^!findquote\s+(?P<q>")?(?P<filter>.+)(?(q)")$'), self._do_findquote),
|
|
(re.compile(r'^!syncquotes$'), self._do_syncquotes),
|
|
(re.compile(r'^!syncsubs$'), self._do_syncsubs),
|
|
(re.compile(r'^!(bella(gram|pics)|insta(gram|bella))$'), self._do_bellagram),
|
|
(re.compile(r'^!yt\s+(?P<q>")?(?P<query>.+)(?(q)")$'), self._do_yt),
|
|
(re.compile(r'^!clip\s+(?P<q>")?(?P<filter>.+)(?(q)")$'), self._do_clip),
|
|
(re.compile(r'^!command\s+set\s+(?P<q1>")?(?P<cmd>.+?)(?(q1)")\s+'
|
|
r'(?P<q2>")?(?P<resp>.+)(?(q2)")$'), self._do_command_set),
|
|
(re.compile(r'^!command\s+unset\s+(?P<q>")?(?P<cmd>.+)(?(q)")$'), self._do_command_unset),
|
|
]
|
|
server = self.config['IRC'].get('server')
|
|
port = self.config['IRC'].getint('port')
|
|
nickname = self.config['IRC'].get('nickname')
|
|
token = self.config['Twitch'].get('token')
|
|
self.logger.info('Connecting to %s:%d', server, port)
|
|
super(TwitchClient, self).__init__([(server, port, token)], nickname, nickname)
|
|
|
|
def connect_(self):
|
|
self._connect()
|
|
|
|
def process_data(self):
|
|
self.reactor.process_once()
|
|
|
|
def on_welcome(self, connection, event):
|
|
connection.cap('REQ', ':twitch.tv/membership')
|
|
connection.cap('REQ', ':twitch.tv/tags')
|
|
connection.cap('REQ', ':twitch.tv/commands')
|
|
for channel in self.config['IRC'].get('channels').split(','):
|
|
channel = '#{0}'.format(channel)
|
|
self.logger.info('Joining %s', channel)
|
|
connection.join(channel)
|
|
|
|
def on_join(self, connection, event):
|
|
self.logger.info('Joined %s', event.target)
|
|
|
|
def on_pubmsg(self, connection, event):
|
|
self._process_message(connection, event)
|
|
|
|
def on_whisper(self, connection, event):
|
|
self._process_message(connection, event)
|
|
|
|
def _is_mod(self, tags):
|
|
if not tags['badges']:
|
|
return False
|
|
badges = [b.split('/')[0] for b in tags['badges'].split(',')]
|
|
return bool(set(badges).intersection(['admin', 'broadcaster', 'moderator']))
|
|
|
|
def _send_response(self, connection, event, msg):
|
|
if event.target.startswith('#'):
|
|
connection.privmsg(event.target, msg)
|
|
else:
|
|
connection.privmsg('#jtv', '/w {0} {1}'.format(event.source.nick, msg))
|
|
|
|
def _process_message(self, connection, event):
|
|
tags = {t['key']: t['value'] for t in event.tags}
|
|
message = ''.join([c for c in event.arguments[0] if not unicodedata.category(c).startswith('C')])
|
|
message = message.rstrip()
|
|
send_response = functools.partial(self._send_response, connection, event)
|
|
for pattern, action in self.patterns + self.supported_commands:
|
|
m = pattern.match(message)
|
|
if m:
|
|
action(tags, send_response, **m.groupdict())
|
|
for cmd, resp in self.extra_commands.items():
|
|
if cmd == message:
|
|
send_response(resp.format(user=tags['display-name']))
|
|
|
|
def _format_quote(self, quote):
|
|
if '"' not in quote['text']:
|
|
quote['text'] = '"{0}"'.format(quote['text'])
|
|
return 'One time, Lilia said this... #{id}: {text} [{game}] [{date}]'.format(**quote)
|
|
|
|
def _add_quote(self, tags, send_response, user, id, text, game, date, **kwargs):
|
|
if text[0] == text[-1] == '"':
|
|
text = text[1:-1]
|
|
self.logger.info('Adding quote %s: %s', id, text)
|
|
try:
|
|
self.commands.add_quote(id, text, game, date)
|
|
except CommandError as e:
|
|
self.logger.error('Failed to add quote: %s', e)
|
|
|
|
def _edit_quote(self, tags, send_response, user, id, text, game, date, **kwargs):
|
|
if text[0] == text[-1] == '"':
|
|
text = text[1:-1]
|
|
self.logger.info('Editing quote %s: %s', id, text)
|
|
try:
|
|
self.commands.edit_quote(id, text, game, date)
|
|
except CommandError as e:
|
|
self.logger.error('Failed to add quote: %s', e)
|
|
|
|
def _remove_quote(self, tags, send_response, user, id, **kwargs):
|
|
self.logger.info('Removing quote %s', id)
|
|
try:
|
|
self.commands.remove_quote(id)
|
|
except CommandError as e:
|
|
self.logger.error('Failed to remove quote: %s', e)
|
|
|
|
def _record_sub(self, tags, send_response, user, **kwargs):
|
|
self.logger.info('Recording new sub of %s', user)
|
|
try:
|
|
self.commands.record_regular_sub(user, 1, kwargs.get('time'))
|
|
except CommandError as e:
|
|
self.logger.error('Failed to record new sub: %s', e)
|
|
|
|
def _record_resub(self, tags, send_response, user, rank, **kwargs):
|
|
self.logger.info('Recording %sx resub of %s', rank, user)
|
|
try:
|
|
self.commands.record_regular_sub(user, int(rank), kwargs.get('time'))
|
|
except CommandError as e:
|
|
self.logger.error('Failed to record resub: %s', e)
|
|
|
|
def _record_gifted_sub(self, tags, send_response, giver, receiver, **kwargs):
|
|
self.logger.info('Recording gifted sub %s -> %s', giver, receiver)
|
|
try:
|
|
self.commands.record_gifted_sub(giver, receiver, kwargs.get('time'))
|
|
except CommandError as e:
|
|
self.logger.error('Failed to record gifted sub: %s', e)
|
|
|
|
def _do_lastquote(self, tags, send_response, **kwargs):
|
|
try:
|
|
quote = self.commands.last_quote()
|
|
except CommandError as e:
|
|
send_response('Sorry @{0}, {1}'.format(tags['display-name'], e))
|
|
else:
|
|
if self.config['Quotes'].getboolean('act_as_proxy'):
|
|
send_response('!quote {0}'.format(quote['id']))
|
|
else:
|
|
send_response(self._format_quote(quote))
|
|
|
|
def _do_findquote(self, tags, send_response, filter, **kwargs):
|
|
try:
|
|
quote = self.commands.find_quote(filter)
|
|
except CommandError as e:
|
|
send_response('Sorry @{0}, {1}'.format(tags['display-name'], e))
|
|
else:
|
|
if self.config['Quotes'].getboolean('act_as_proxy'):
|
|
send_response('!quote {0}'.format(quote['id']))
|
|
else:
|
|
send_response(self._format_quote(quote))
|
|
|
|
def _do_syncquotes(self, tags, send_response, **kwargs):
|
|
if not self._is_mod(tags):
|
|
send_response('Sorry @{0}, you are not allowed to do this'.format(tags['display-name']))
|
|
return
|
|
try:
|
|
messages = self.commands.get_quote_messages()
|
|
except CommandError as e:
|
|
self.logger.error('Failed to get quote messages: %s', e)
|
|
else:
|
|
for message, time in messages:
|
|
for pattern, action in self.patterns:
|
|
m = pattern.match(message)
|
|
if m:
|
|
action(tags, send_response, **m.groupdict(), time=time)
|
|
send_response('Sync finished, @{0}'.format(tags['display-name']))
|
|
|
|
def _do_syncsubs(self, tags, send_response, **kwargs):
|
|
if not self._is_mod(tags):
|
|
send_response('Sorry @{0}, you are not allowed to do this'.format(tags['display-name']))
|
|
return
|
|
try:
|
|
messages = self.commands.get_sub_messages()
|
|
except CommandError as e:
|
|
self.logger.error('Failed to get sub messages: %s', e)
|
|
else:
|
|
for message, time in messages:
|
|
for pattern, action in self.patterns:
|
|
m = pattern.match(message)
|
|
if m:
|
|
action(tags, send_response, **m.groupdict(), time=time)
|
|
send_response('Sync finished, @{0}'.format(tags['display-name']))
|
|
|
|
def _do_bellagram(self, tags, send_response, **kwargs):
|
|
try:
|
|
bellagram = self.commands.bellagram()
|
|
except CommandError as e:
|
|
send_response('Sorry @{0}, {1}'.format(tags['display-name'], e))
|
|
else:
|
|
send_response(bellagram['url'])
|
|
|
|
def _do_yt(self, tags, send_response, query, **kwargs):
|
|
try:
|
|
result = self.commands.query_youtube(query)
|
|
except CommandError as e:
|
|
send_response('Sorry @{0}, {1}'.format(tags['display-name'], e))
|
|
else:
|
|
send_response('{0}: {1}'.format(result['title'], result['url']))
|
|
|
|
def _do_clip(self, tags, send_response, filter, **kwargs):
|
|
try:
|
|
result = self.commands.find_clip(filter)
|
|
except CommandError as e:
|
|
send_response('Sorry @{0}, {1}'.format(tags['display-name'], e))
|
|
else:
|
|
send_response('{0}: {1}'.format(result['title'], result['url']))
|
|
|
|
def _do_command_set(self, tags, send_response, cmd, resp, **kwargs):
|
|
if not self._is_mod(tags):
|
|
send_response('Sorry @{0}, you are not allowed to do this'.format(tags['display-name']))
|
|
return
|
|
try:
|
|
self.extra_commands[cmd] = resp
|
|
self.extra_commands.save()
|
|
except Exception as e:
|
|
self.logger.error('Failed to update extra commands: %s', e)
|
|
send_response('Sorry @{0}, failed to set the command'.format(tags['display-name']))
|
|
else:
|
|
send_response('Command {0} set, @{1}'.format(cmd, tags['display-name']))
|
|
|
|
def _do_command_unset(self, tags, send_response, cmd, **kwargs):
|
|
if not self._is_mod(tags):
|
|
send_response('Sorry @{0}, you are not allowed to do this'.format(tags['display-name']))
|
|
return
|
|
try:
|
|
self.extra_commands.pop(cmd, None)
|
|
self.extra_commands.save()
|
|
except Exception as e:
|
|
self.logger.error('Failed to update extra commands: %s', e)
|
|
send_response('Sorry @{0}, failed to unset the command'.format(tags['display-name']))
|
|
else:
|
|
send_response('Command {0} unset, @{1}'.format(cmd, tags['display-name']))
|