import datetime import io import re import time import dateutil.parser import discord import fuzzywuzzy.process import twitter from commands import CommandError # @everyone Lilia is Live! Playing $game ! Come and say hi at $url ! ANNOUNCEMENT_PATTERN = re.compile(r'''^ @everyone\s+ Lilia\s+is\s+Live!\s+ Playing\s+(?P.+)\s+!\s+ Come\s+and\s+say\s+hi\s+at\s+(?P.+)\s+! $''', re.VERBOSE) def cooldown(retries, timeout, failure): def do_cooldown(function): def wrapper(self, server, user, *args, **kwargs): cooldowns = getattr(function, 'cooldowns', {}) hash = '{0}:{1}'.format(server, user) if hash not in cooldowns: cooldowns[hash] = dict(tries=0, last_run=0) cd = cooldowns[hash] if cd['tries'] < retries: cd['tries'] += 1 cd['last_run'] = time.time() setattr(function, 'cooldowns', cooldowns) return function(self, server, user, *args, **kwargs) if time.time() - cd['last_run'] > timeout: cd['tries'] = 1 cd['last_run'] = time.time() setattr(function, 'cooldowns', cooldowns) return function(self, server, user, *args, **kwargs) return failure(self, server, user, *args, **kwargs) return wrapper return do_cooldown class DiscordClient(discord.Client): def __init__(self, config, logger, commands, extra_commands): self.config = config self.logger = logger self.commands = commands self.extra_commands = extra_commands self.twitter_api = twitter.Api( consumer_key=self.config['Twitter'].get('consumer_key'), consumer_secret=self.config['Twitter'].get('consumer_secret'), access_token_key=self.config['Twitter'].get('access_token_key'), access_token_secret=self.config['Twitter'].get('access_token_secret')) self.supported_commands = [ (re.compile(r'^!lastquote$'), self._do_lastquote), (re.compile(r'^!findquote\s+(?P")?(?P.+)(?(q)")$'), self._do_findquote), (re.compile(r'^!(bella(gram|pics)|insta(gram|bella))$'), self._do_bellagram), (re.compile(r'^!yt\s+(?P")?(?P.+)(?(q)")$'), self._do_yt), (re.compile(r'^!(find)?clip\s+(?P")?(?P.+)(?(q)")$'), self._do_clip), (re.compile(r'^!cheese\s+(?P")?(?P.+)(?(q)")$'), self._do_cheese), (re.compile(r'^!roll(\s+(?P")?(?P.+)(?(q)"))?$'), self._do_roll), (re.compile(r'^!nextstream$'), self._do_nextstream), (re.compile(r'^!conv(ert)?\s+(?P")?(?P.+?)(?(q1)")' r'(\s+(?P")?(?P[^\s]+)(?(q2)"))?$'), self._do_convert), ] super(DiscordClient, self).__init__() def _replace_emojis(self, s, guild=None): def repl(m): name = m.group(0).strip(':') emoji = None if guild: emoji = discord.utils.get(guild.emojis, name=name) if not emoji: emoji = discord.utils.get(self.emojis, name=name) return str(emoji) if emoji else m.group(0) return re.sub(r'(?P:)?\w+(?(c):)', repl, s) async def start_(self): token = self.config['Discord'].get('token') await self.start(token) async def on_ready(self): self.logger.info('Logged in as {0}'.format(self.user.name)) async def on_message(self, message): if isinstance(message.channel, discord.DMChannel): await self._process_dm(message) return await self._process_announcement(message) server = message.guild.id if message.guild else None for pattern, action in self.supported_commands: m = pattern.match(message.content) if m: await action(server, message.author.id, message, **m.groupdict()) for cmd, resp in self.extra_commands.items(): if cmd == message.content: resp = self._replace_emojis(resp, guild=message.guild) await message.channel.send(resp.format(user=message.author.mention)) async def on_member_join(self, member): new_members_channel = self.config['Discord'].getint('new_members_channel') info_channel = self.config['Discord'].getint('info_channel') welcome_channel = self.config['Discord'].getint('welcome_channel') welcome_pattern = self.config['Discord'].get('welcome_pattern') info_channel = self.get_channel(info_channel) if member.guild.system_channel.id != new_members_channel: return if not info_channel: return welcome_channel = self.get_channel(welcome_channel) if not welcome_channel: return try: await welcome_channel.send(welcome_pattern.format(user=member.mention, info_channel=info_channel.mention)) except discord.errors.Forbidden: pass threshold = self.config['Discord'].getint('gifted_sub_threshold') self.logger.info('Looking for gifted subs matching {0}'.format(member.display_name)) gifted_subs = self._guess_gifted_subs(member.display_name, datetime.datetime.utcnow(), threshold) if gifted_subs: await self._process_gifted_subs(member, gifted_subs) def _guess_gifted_subs(self, nick, time, threshold): subs = self.commands.plausible_gifted_subs(time) choices = {i: s['receiver'] for i, s in enumerate(subs)} guesses = fuzzywuzzy.process.extractWithoutOrder(nick, choices, score_cutoff=threshold) result = [] for _, score, i in guesses: days = (time - dateutil.parser.parse(subs[i]['time'])).days result.append((subs[i]['receiver'], score, days)) return sorted(result, key=lambda x: (x[1], 32 - x[2], x[0]), reverse=True) async def _process_gifted_subs(self, user, gifted_subs): header = '{0: <20} | {1: <5} | {2: <11}'.format('Twitch username', 'Match', 'When') table = [header, '-' * len(header)] for nick, score, days in gifted_subs: table.append('{0: <20} | {1: >4}% | {2: >2} days ago'.format(nick, score, days)) message = ('It seems that {0} could have been gifted a sub. ' 'Here are the most probable candidates:\n```{1}```').format(user.mention, '\n'.join(table)) channel = self.config['Discord'].getint('gifted_sub_notice_channel') try: await self.get_channel(channel).send(message) except discord.errors.Forbidden: pass async def _process_dm(self, message): anon_channel = self.config['Discord'].getint('anon_channel') anon_channel = self.get_channel(anon_channel) member = anon_channel.guild.get_member(message.channel.recipient.id) if not member: return if not member.permissions_in(anon_channel).send_messages: return files = [] for attachment in message.attachments: with io.BytesIO() as f: await attachment.save(f) files.append(discord.File(f, filename=attachment.filename, spoiler=attachment.is_spoiler())) content = self._replace_emojis(message.content, guild=anon_channel.guild) await anon_channel.send(content, files=files) async def _process_announcement(self, message): announcer = self.config['Discord'].getint('announcer') channels = self.config['Discord'].get('announcement_channels').split(',') channels = [int(c) for c in channels] if not message.author or not message.channel: return if message.author.id != announcer or message.channel.id not in channels: return after = datetime.datetime.utcnow() - datetime.timedelta(minutes=5) for channel in [self.get_channel(c) for c in channels if c != message.channel.id]: async for msg in channel.history(after=after): if msg.content == message.content: return try: await channel.send(message.content) except discord.errors.Forbidden: pass announcement = self.config['Twitter'].get('announcement_pattern') m = ANNOUNCEMENT_PATTERN.match(message.content) if m: self.twitter_api.PostUpdate(announcement.format(**m.groupdict())) async def _cooldown_failure(self, server, user, message, **kwargs): await message.channel.send('Sorry {0}, you have to wait a while before running ' 'the same command again'.format(message.author.mention)) def _format_quote(self, quote): if '"' not in quote['text']: quote['text'] = '"{0}"'.format(quote['text']) return 'One time, Lilia said this... `#{id}: {text} [{game}] [{date}]`'.format(**quote) async def _do_lastquote(self, server, user, message, **kwargs): try: result = self.commands.last_quote() except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: await message.channel.send(self._format_quote(result)) async def _do_findquote(self, server, user, message, filter, **kwargs): try: result = self.commands.find_quote(filter) except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: await message.channel.send(self._format_quote(result)) @cooldown(retries=2, timeout=5*60, failure=_cooldown_failure) async def _do_bellagram(self, server, user, message, **kwargs): try: bellagram = self.commands.bellagram() except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: embed = discord.Embed(title=bellagram['caption'], url=bellagram['url'], color=0xd2a517) embed.set_image(url=bellagram['display_url']) embed.set_author(name=bellagram['owner_username'], url=bellagram['owner_url'], icon_url=bellagram['owner_profile_pic_url']) await message.channel.send(embed=embed) @cooldown(retries=3, timeout=5*60, failure=_cooldown_failure) async def _do_yt(self, server, user, message, query, **kwargs): try: result = self.commands.query_youtube(query) except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: if result['kind'] == 'playlist': embed = discord.Embed(title=result['title'], url=result['url'], description=result['description'], color=0xff0000) embed.set_thumbnail(url=result['thumbnail_url']) embed.set_author(name=result['channel_title'], url=result['channel_url'], icon_url=result['channel_thumbnail_url']) await message.channel.send(embed=embed) else: await message.channel.send('{title}\n{url}'.format(**result)) @cooldown(retries=3, timeout=5*60, failure=_cooldown_failure) async def _do_clip(self, server, user, message, filter, **kwargs): try: result = self.commands.find_clip(filter) except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: await message.channel.send('{title}\n{url}'.format(**result)) @cooldown(retries=3, timeout=5*60, failure=_cooldown_failure) async def _do_cheese(self, server, user, message, query, **kwargs): try: result = self.commands.query_cheese_com(query) except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: embed = discord.Embed(title=result['name'], url=result['url'], description=result['description'], color=0xffd700) if result['image']: embed.set_thumbnail(url=result['image']) embed.set_image(url=result['image']) for point in result['summary']: try: name, value = point.split(':') except ValueError: name, value = 'Base', point embed.add_field(name=name.strip(), value=value.strip(), inline=True) await message.channel.send(embed=embed) async def _do_roll(self, server, user, message, formula, **kwargs): try: result = self.commands.roll(formula) except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: embed = discord.Embed(description=result, color=0xaaaccc) await message.channel.send(embed=embed) async def _do_nextstream(self, server, user, message, **kwargs): def format_delta(d): if d.total_seconds() <= 0: return 'In progress' days = d.days hours, rem = divmod(d.seconds, 60 * 60) mins, secs = divmod(rem, 60) result = '' if days > 0: result += '{0} day{1}'.format(days, 's' if days > 1 else '') if hours > 0: if result: result += ', ' result += '{0} hour{1}'.format(hours, 's' if hours > 1 else '') if result: result += ', ' result += '{0} minute{1}'.format(mins, 's' if mins > 1 else '') return 'In ' + result try: result = self.commands.next_stream() except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: embed = discord.Embed(title=result['title'], url='https://www.twitch.tv/events/{0}'.format(result['id']), description=result['description'], color=0x4b367c) embed.set_thumbnail(url=result['cover_image_url'].format(width=128, height=72)) embed.set_image(url=result['game_box_large']) embed.set_author(name='lilialil', url='https://twitch.tv/lilialil', icon_url='https://static-cdn.jtvnw.net/jtv_user_pictures/lilialil-profile_image-7601a69cf14adae1-300x300.png') time = dateutil.parser.parse(result['start']) embed.add_field(name=format_delta(time - datetime.datetime.utcnow()), value=time.strftime('%Y-%m-%d %I:%M %p GMT'), inline=False) await message.channel.send(embed=embed) async def _do_convert(self, server, user, message, expression, unit, **kwargs): try: result = self.commands.convert(expression, unit) except CommandError as e: await message.channel.send('Sorry {0}, {1}'.format(message.author.mention, e)) else: embed = discord.Embed(description='**{0}**'.format(result), color=0x39ad0f) await message.channel.send(embed=embed)