|
|
|
import fuzzywuzzy.fuzz
|
|
|
|
import fuzzywuzzy.process
|
|
|
|
import googleapiclient
|
|
|
|
import googleapiclient.discovery
|
|
|
|
|
|
|
|
|
|
|
|
BASE_URL = 'https://www.youtube.com'
|
|
|
|
|
|
|
|
|
|
|
|
class YoutubeError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class Youtube(object):
|
|
|
|
def __init__(self, api_key):
|
|
|
|
self.client = googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key)
|
|
|
|
|
|
|
|
def _search(self, channel_id, query, playlists, limit):
|
|
|
|
def get_thumbnail_url(thumbnails):
|
|
|
|
for key in ['high', 'medium', 'default']:
|
|
|
|
if key in thumbnails:
|
|
|
|
return thumbnails[key]['url']
|
|
|
|
resp = self.client.channels().list(
|
|
|
|
id=channel_id,
|
|
|
|
maxResults=1,
|
|
|
|
part='snippet').execute()
|
|
|
|
channel = resp['items'][0]
|
|
|
|
result = []
|
|
|
|
count = limit
|
|
|
|
token = ''
|
|
|
|
while True:
|
|
|
|
resp = self.client.search().list(
|
|
|
|
channelId=channel_id,
|
|
|
|
q=query,
|
|
|
|
safeSearch='none',
|
|
|
|
type='playlist' if playlists else 'video',
|
|
|
|
maxResults=min(count, 50),
|
|
|
|
part='id,snippet',
|
|
|
|
pageToken=token).execute()
|
|
|
|
for item in resp.get('items', []):
|
|
|
|
kind = item['id']['kind'].split('youtube#')[1]
|
|
|
|
if kind == 'playlist':
|
|
|
|
url = '{0}/view_play_list?p={1}'.format(BASE_URL, item['id']['playlistId'])
|
|
|
|
else:
|
|
|
|
url = '{0}/watch?v={1}'.format(BASE_URL, item['id']['videoId'])
|
|
|
|
result.append(dict(
|
|
|
|
kind=kind,
|
|
|
|
url=url,
|
|
|
|
title=item['snippet']['title'],
|
|
|
|
description=item['snippet']['description'],
|
|
|
|
thumbnail_url=get_thumbnail_url(item['snippet']['thumbnails']),
|
|
|
|
channel_title=channel['snippet']['title'],
|
|
|
|
channel_url='{0}/c/{1}'.format(BASE_URL, channel['snippet']['customUrl']),
|
|
|
|
channel_thumbnail_url=get_thumbnail_url(channel['snippet']['thumbnails'])))
|
|
|
|
count -= resp['pageInfo']['resultsPerPage']
|
|
|
|
if count <= 0:
|
|
|
|
break
|
|
|
|
token = resp.get('nextPageToken')
|
|
|
|
if not token:
|
|
|
|
break
|
|
|
|
return result
|
|
|
|
|
|
|
|
def search(self, channel_id, query, playlists=True, limit=None):
|
|
|
|
try:
|
|
|
|
return self._search(channel_id, query, playlists, limit)
|
|
|
|
except googleapiclient.errors.HttpError as e:
|
|
|
|
raise YoutubeError('Failed to query Youtube API: {0}'.format(e))
|
|
|
|
|
|
|
|
def find_best_match(self, channel_ids, query):
|
|
|
|
results = []
|
|
|
|
for channel_id in channel_ids:
|
|
|
|
try:
|
|
|
|
results.extend(self._search(channel_id, query, playlists=True, limit=1))
|
|
|
|
results.extend(self._search(channel_id, query, playlists=False, limit=1))
|
|
|
|
except googleapiclient.errors.HttpError as e:
|
|
|
|
raise YoutubeError('Failed to query Youtube API: {0}'.format(e))
|
|
|
|
if not results:
|
|
|
|
return None
|
|
|
|
tokens = [t for t in query.split('|') if not t.strip().startswith('-')] or ['']
|
|
|
|
matches = []
|
|
|
|
for token in tokens:
|
|
|
|
titles = {i: r['title'] for i, r in enumerate(results)}
|
|
|
|
descriptions = {i: r['description'] for i, r in enumerate(results)}
|
|
|
|
matches.append(fuzzywuzzy.process.extractOne(token, titles,
|
|
|
|
scorer=fuzzywuzzy.fuzz.token_sort_ratio))
|
|
|
|
matches.append(fuzzywuzzy.process.extractOne(token, descriptions,
|
|
|
|
scorer=fuzzywuzzy.fuzz.token_sort_ratio))
|
|
|
|
_, _, i = sorted(matches, key=lambda m: m[1], reverse=True)[0]
|
|
|
|
return results[i]
|