You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
137 lines
5.0 KiB
137 lines
5.0 KiB
import html
|
|
|
|
import dateutil.parser
|
|
import fuzzywuzzy.fuzz
|
|
import fuzzywuzzy.process
|
|
import googleapiclient
|
|
import googleapiclient.discovery
|
|
|
|
|
|
BASE_URL = 'https://www.youtube.com'
|
|
|
|
|
|
class YouTube(object):
|
|
def __init__(self, api_key, channel_id):
|
|
self.client = googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key)
|
|
self.channel = self.get_channel(channel_id) or {}
|
|
|
|
def get_thumbnail_url(self, thumbnails):
|
|
for key in ('high', 'medium', 'default'):
|
|
if key in thumbnails:
|
|
return thumbnails[key].get('url')
|
|
|
|
def process_item(self, item):
|
|
id = item.get('id', '')
|
|
kind = item.get('kind', 'youtube#').split('youtube#')[1]
|
|
if kind == 'searchResult':
|
|
id = item.get('id', {}).get('videoId', '')
|
|
kind = item.get('id', {}).get('kind', 'youtube#').split('youtube#')[1]
|
|
if kind == 'playlist':
|
|
link = '{0}/view_play_list?p={1}'.format(BASE_URL, id)
|
|
else:
|
|
link = '{0}/watch?v={1}'.format(BASE_URL, id)
|
|
scheduled_start = item.get('liveStreamingDetails', {}).get('scheduledStartTime')
|
|
if scheduled_start:
|
|
scheduled_start = dateutil.parser.parse(scheduled_start)
|
|
return dict(
|
|
kind=kind,
|
|
link=link,
|
|
title=html.unescape(item.get('snippet', {}).get('title', '')),
|
|
description=html.unescape(item.get('snippet', {}).get('description', '')),
|
|
thumbnail_url=self.get_thumbnail_url(item.get('snippet', {}).get('thumbnails', {})),
|
|
live_broadcast=item.get('snippet', {}).get('liveBroadcastContent', 'none'),
|
|
scheduled_start=scheduled_start
|
|
)
|
|
|
|
def process_items(self, items):
|
|
return [self.process_item(i) for i in items]
|
|
|
|
def get_channel(self, channel_id):
|
|
r = self.client.channels().list(id=channel_id, maxResults=1, part='id,snippet').execute()
|
|
channels = r.get('items', [])
|
|
if not channels:
|
|
return None
|
|
channel = channels.pop()
|
|
return dict(
|
|
id=channel.get('id', ''),
|
|
link='{0}/c/{1}'.format(BASE_URL, channel.get('snippet', {}).get('customUrl', '')),
|
|
title=html.unescape(channel.get('snippet', {}).get('title', '')),
|
|
thumbnail_url=self.get_thumbnail_url(channel.get('snippet', {}).get('thumbnails', {}))
|
|
)
|
|
|
|
def get_video(self, video_id):
|
|
r = self.client.videos().list(id=video_id, maxResults=1, part='id,snippet,liveStreamingDetails').execute()
|
|
videos = r.get('items', [])
|
|
if not videos:
|
|
return None
|
|
return self.process_item(videos.pop())
|
|
|
|
def get_playlists(self):
|
|
token = ''
|
|
result = []
|
|
while True:
|
|
r = self.client.playlists().list(
|
|
channelId=self.channel.get('id'),
|
|
maxResults=50,
|
|
part='id,snippet',
|
|
pageToken=token
|
|
).execute()
|
|
result.extend(self.process_items(r.get('items', [])))
|
|
token = r.get('nextPageToken')
|
|
if not token:
|
|
break
|
|
return result
|
|
|
|
def search_videos(self, query, limit):
|
|
count = limit
|
|
token = ''
|
|
result = []
|
|
while True:
|
|
r = self.client.search().list(
|
|
channelId=self.channel.get('id'),
|
|
q=query,
|
|
safeSearch='none',
|
|
type='video',
|
|
maxResults=min(count, 50),
|
|
part='id,snippet',
|
|
pageToken=token
|
|
).execute()
|
|
result.extend(self.process_items(r.get('items', [])))
|
|
count -= r.get('pageInfo', {}).get('resultsPerPage', 0)
|
|
if count <= 0:
|
|
break
|
|
token = r.get('nextPageToken')
|
|
if not token:
|
|
break
|
|
return result
|
|
|
|
def search(self, query):
|
|
try:
|
|
results = self.get_playlists()
|
|
results.extend(self.search_videos(query, limit=5))
|
|
except googleapiclient.errors.HttpError as e:
|
|
raise YouTubeError(str(e))
|
|
if not results:
|
|
return None
|
|
tokens = [t for t in query.split('|') if not t.strip().startswith('-')] or ['']
|
|
matches = []
|
|
for token in tokens:
|
|
titles = {i: r.get('title') for i, r in enumerate(results)}
|
|
descriptions = {i: r.get('description') for i, r in enumerate(results)}
|
|
match = fuzzywuzzy.process.extractOne(token, titles,
|
|
scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25)
|
|
if match:
|
|
matches.append(match)
|
|
match = fuzzywuzzy.process.extractOne(token, descriptions,
|
|
scorer=fuzzywuzzy.fuzz.token_set_ratio, score_cutoff=25)
|
|
if match:
|
|
matches.append(match)
|
|
if not matches:
|
|
return None
|
|
_, _, i = sorted(matches, key=lambda m: m[1], reverse=True)[0]
|
|
return results[i]
|
|
|
|
|
|
class YouTubeError(Exception):
|
|
pass
|