You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
3.1 KiB

import hashlib
import json
import re
import requests
BASE_URL = 'https://www.instagram.com'
QUERY_HASH = '42323d64886122307be10013ad2dcc44'
SHARED_DATA = re.compile(r'window\._sharedData = (\{.*\});</script>')
class InstagramError(Exception):
pass
class Instagram(object):
def __init__(self, username):
self.username = username
shared_data = self._get_shared_data()
try:
graphql = shared_data['entry_data']['ProfilePage'][0]['graphql']
self.user_id = graphql['user']['id']
self.rhx_gis = shared_data['rhx_gis']
self.owner = dict(
name=graphql['user']['username'],
profile_pic_url=graphql['user']['profile_pic_url'])
except (IndexError, KeyError, TypeError):
self.user_id = None
self.rhx_gis = None
def _get_shared_data(self):
r = requests.get('{0}/{1}'.format(BASE_URL, self.username))
if not r.ok:
return None
m = SHARED_DATA.search(r.text)
if m:
return json.loads(m.group(1))
return None
def _get_media(self):
def request(count, cursor):
variables = json.dumps(dict(
id=self.user_id,
first=count,
after=cursor))
gis = '{0}:{1}'.format(self.rhx_gis, variables)
gis = hashlib.md5(gis.encode('UTF-8')).hexdigest()
url = '{0}/graphql/query'.format(BASE_URL)
params = dict(
query_hash=QUERY_HASH,
variables=variables)
r = requests.get(url, params=params, headers={'X-Instagram-GIS': gis})
r.raise_for_status()
return r.json()
result = []
try:
count = 50
cursor = ''
while True:
data = request(count, cursor)
data = data['data']['user']['edge_owner_to_timeline_media']
for edge in data['edges']:
result.append(dict(
type=edge['node']['__typename'].split('Graph')[1],
title=edge['node']['edge_media_to_caption']['edges'][0]['node']['text'],
url='{0}/p/{1}'.format(BASE_URL, edge['node']['shortcode']),
display_url=edge['node']['display_url'],
owner=self.owner['name'],
owner_url='{0}/{1}'.format(BASE_URL, self.username),
owner_pic_url=self.owner['profile_pic_url']))
cursor = data['page_info']['end_cursor']
if not cursor:
break
except KeyError:
return None
else:
return result
def get_media(self):
if not self.user_id:
return None
try:
result = self._get_media()
except requests.exceptions.HTTPError as e:
raise InstagramError('Failed to retrieve media: {0}'.format(e))
except json.decoder.JSONDecodeError as e:
raise InstagramError('Failed to retrieve media: {0}'.format(e))
else:
return result