import html import dateutil.parser import fuzzywuzzy.fuzz import fuzzywuzzy.process import googleapiclient import googleapiclient.discovery BASE_URL = 'https://www.youtube.com' class YouTube(object): def __init__(self, api_key, channel_id): self.client = googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key) self.channel = self.get_channel(channel_id) def get_thumbnail_url(self, thumbnails): for key in ('high', 'medium', 'default'): if key in thumbnails: return thumbnails[key].get('url') def process_item(self, item): id = item.get('id', '') kind = item.get('kind', 'youtube#').split('youtube#')[1] if kind == 'searchResult': id = item.get('id', {}).get('videoId', '') kind = item.get('id', {}).get('kind', 'youtube#').split('youtube#')[1] if kind == 'playlist': link = '{0}/view_play_list?p={1}'.format(BASE_URL, id) else: link = '{0}/watch?v={1}'.format(BASE_URL, id) scheduled_start = item.get('liveStreamingDetails', {}).get('scheduledStartTime') if scheduled_start: scheduled_start = dateutil.parser.parse(scheduled_start) return dict( kind=kind, link=link, title=html.unescape(item.get('snippet', {}).get('title', '')), description=html.unescape(item.get('snippet', {}).get('description', '')), thumbnail_url=self.get_thumbnail_url(item.get('snippet', {}).get('thumbnails', {})), live_broadcast=item.get('snippet', {}).get('liveBroadcastContent', 'none'), scheduled_start=scheduled_start ) def process_items(self, items): return [self.process_item(i) for i in items] def get_channel(self, channel_id): r = self.client.channels().list(id=channel_id, maxResults=1, part='id,snippet').execute() channel = r.get('items', [{}]).pop() return dict( id=channel.get('id', ''), link='{0}/c/{1}'.format(BASE_URL, channel.get('snippet', {}).get('customUrl', '')), title=html.unescape(channel.get('snippet', {}).get('title', '')), thumbnail_url=self.get_thumbnail_url(channel.get('snippet', {}).get('thumbnails', {})) ) def get_video(self, video_id): r = self.client.videos().list(id=video_id, maxResults=1, part='id,snippet,liveStreamingDetails').execute() video = r.get('items', [{}]).pop() return self.process_item(video) def get_playlists(self): token = '' result = [] while True: r = self.client.playlists().list( channelId=self.channel.get('id'), maxResults=50, part='id,snippet', pageToken=token ).execute() result.extend(self.process_items(r.get('items', []))) token = r.get('nextPageToken') if not token: break return result def search_videos(self, query, limit): count = limit token = '' result = [] while True: r = self.client.search().list( channelId=self.channel.get('id'), q=query, safeSearch='none', type='video', maxResults=min(count, 50), part='id,snippet', pageToken=token ).execute() result.extend(self.process_items(r.get('items', []))) count -= r.get('pageInfo', {}).get('resultsPerPage', 0) if count <= 0: break token = r.get('nextPageToken') if not token: break return result def search(self, query): try: results = self.get_playlists() results.extend(self.search_videos(query, limit=5)) except googleapiclient.errors.HttpError as e: raise YouTubeError(str(e)) if not results: return None tokens = [t for t in query.split('|') if not t.strip().startswith('-')] or [''] matches = [] for token in tokens: titles = {i: r.get('title') for i, r in enumerate(results)} descriptions = {i: r.get('description') for i, r in enumerate(results)} match = fuzzywuzzy.process.extractOne(token, titles, scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25) if match: matches.append(match) match = fuzzywuzzy.process.extractOne(token, descriptions, scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25) if match: matches.append(match) if not matches: return None _, _, i = sorted(matches, key=lambda m: m[1], reverse=True)[0] return results[i] class YouTubeError(Exception): pass