import html
import dateutil.parser
import fuzzywuzzy.fuzz
import fuzzywuzzy.process
import googleapiclient
import googleapiclient.discovery
BASE_URL = 'https://www.youtube.com'
class YouTube(object):
def __init__(self, api_key, channel_id):
self.client = googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key)
self.channel = self.get_channel(channel_id)
def get_thumbnail_url(self, thumbnails):
for key in ('high', 'medium', 'default'):
if key in thumbnails:
return thumbnails[key].get('url')
def process_item(self, item):
id = item.get('id', '')
kind = item.get('kind', 'youtube#').split('youtube#')[1]
if kind == 'searchResult':
id = item.get('id', {}).get('videoId', '')
kind = item.get('id', {}).get('kind', 'youtube#').split('youtube#')[1]
if kind == 'playlist':
link = '{0}/view_play_list?p={1}'.format(BASE_URL, id)
else:
link = '{0}/watch?v={1}'.format(BASE_URL, id)
scheduled_start = item.get('liveStreamingDetails', {}).get('scheduledStartTime')
if scheduled_start:
scheduled_start = dateutil.parser.parse(scheduled_start)
return dict(
kind=kind,
link=link,
title=html.unescape(item.get('snippet', {}).get('title', '')),
description=html.unescape(item.get('snippet', {}).get('description', '')),
thumbnail_url=self.get_thumbnail_url(item.get('snippet', {}).get('thumbnails', {})),
live_broadcast=item.get('snippet', {}).get('liveBroadcastContent', 'none'),
scheduled_start=scheduled_start
)
def process_items(self, items):
return [self.process_item(i) for i in items]
def get_channel(self, channel_id):
r = self.client.channels().list(id=channel_id, maxResults=1, part='id,snippet').execute()
channel = r.get('items', [{}]).pop()
return dict(
id=channel.get('id', ''),
link='{0}/c/{1}'.format(BASE_URL, channel.get('snippet', {}).get('customUrl', '')),
title=html.unescape(channel.get('snippet', {}).get('title', '')),
thumbnail_url=self.get_thumbnail_url(channel.get('snippet', {}).get('thumbnails', {}))
)
def get_video(self, video_id):
r = self.client.videos().list(id=video_id, maxResults=1, part='id,snippet,liveStreamingDetails').execute()
video = r.get('items', [{}]).pop()
return self.process_item(video)
def get_playlists(self):
token = ''
result = []
while True:
r = self.client.playlists().list(
channelId=self.channel.get('id'),
maxResults=50,
part='id,snippet',
pageToken=token
).execute()
result.extend(self.process_items(r.get('items', [])))
token = r.get('nextPageToken')
if not token:
break
return result
def search_videos(self, query, limit):
count = limit
token = ''
result = []
while True:
r = self.client.search().list(
channelId=self.channel.get('id'),
q=query,
safeSearch='none',
type='video',
maxResults=min(count, 50),
part='id,snippet',
pageToken=token
).execute()
result.extend(self.process_items(r.get('items', [])))
count -= r.get('pageInfo', {}).get('resultsPerPage', 0)
if count <= 0:
break
token = r.get('nextPageToken')
if not token:
break
return result
def search(self, query):
try:
results = self.get_playlists()
results.extend(self.search_videos(query, limit=5))
except googleapiclient.errors.HttpError as e:
raise YouTubeError(str(e))
if not results:
return None
tokens = [t for t in query.split('|') if not t.strip().startswith('-')] or ['']
matches = []
for token in tokens:
titles = {i: r.get('title') for i, r in enumerate(results)}
descriptions = {i: r.get('description') for i, r in enumerate(results)}
match = fuzzywuzzy.process.extractOne(token, titles,
scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25)
if match:
matches.append(match)
match = fuzzywuzzy.process.extractOne(token, descriptions,
scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25)
if match:
matches.append(match)
if not matches:
return None
_, _, i = sorted(matches, key=lambda m: m[1], reverse=True)[0]
return results[i]
class YouTubeError(Exception):
pass