Implement Twitter webhook endpoint

master
Nikola Forró 4 years ago
parent 2181bb49e1
commit 14471b7902

@ -0,0 +1,128 @@
import base64
import configparser
import hashlib
import hmac
import json
import logging
import pathlib
import flask
import persistqueue
import requests
import tweepy
config = configparser.ConfigParser()
config.read('settings.cfg')
app = flask.Flask(__name__)
app.logger.setLevel(logging.INFO)
app.config.update(
SERVER_NAME=config.get('Flask', 'server_name'),
SECRET_KEY=config.get('Flask', 'secret_key'),
ERROR_404_HELP=False,
JSONIFY_PRETTYPRINT_REGULAR=True
)
storage_path = config.get('General', 'storage_path')
consumer_key = config.get('Twitter', 'consumer_key')
consumer_secret = config.get('Twitter', 'consumer_secret')
callback_url = config.get('Twitter', 'callback_url')
webhook_url = config.get('Twitter', 'webhook_url')
environment_name = config.get('Twitter', 'environment_name')
def update_tokens(user_id, access_token):
path = pathlib.Path(storage_path, 'tokens.json')
path.touch(exist_ok=True)
with open(path, 'r+') as f:
try:
tokens = json.load(f)
except json.decoder.JSONDecodeError:
tokens = dict()
tokens.update({user_id: access_token})
f.seek(0)
json.dump(tokens, f, sort_keys=True, indent=4)
f.truncate()
def push_event(data):
queue = persistqueue.SQLiteQueue(storage_path)
queue.put(data)
queue.task_done()
def call_account_activity_api(api, method, endpoint, data=None):
url = 'https://{0}{1}/account_activity/all/{2}'.format(api.host, api.api_root, environment_name)
r = requests.request(method, '{0}/{1}'.format(url, endpoint), auth=api.auth.apply_auth(), data=data)
if not r.ok:
errors = r.json().get('errors')
if errors:
app.logger.error(errors.pop().get('message'))
return r
@app.route('/auth')
def auth():
handler = tweepy.OAuthHandler(consumer_key, consumer_secret, callback_url)
url = handler.get_authorization_url()
flask.session['request_token'] = handler.request_token
return flask.redirect(url)
@app.route('/callback')
def callback():
request_token = flask.session['request_token']
del flask.session['request_token']
handler = tweepy.OAuthHandler(consumer_key, consumer_secret, callback_url)
handler.request_token = request_token
try:
handler.get_access_token(flask.request.args.get('oauth_verifier'))
except tweepy.TweepError:
return 'Authentication failure', 500
api = tweepy.API(handler)
user = api.verify_credentials(include_entities=False, skip_status=True, include_email=False)
if not user:
return 'Authentication failure', 500
update_tokens(user.id_str, [handler.access_token, handler.access_token_secret])
# TODO: check if the webook already exists
if not call_account_activity_api(api, 'POST', 'webhooks.json', data=dict(url=webhook_url)).ok:
return 'Webhook registration failure', 500
# TODO: check if already subscribed
if not call_account_activity_api(api, 'POST', 'subscriptions.json').ok:
return 'Webhook subscription failure', 500
return 'Success', 200
@app.route('/webhook', methods=['GET'])
def webhook_get():
crc = flask.request.args.get('crc_token')
if not crc:
return 'Invalid request', 403
digest = hmac.new(consumer_secret.encode(), msg=crc.encode(), digestmod=hashlib.sha256).digest()
return dict(response_token='sha256={0}'.format(base64.b64encode(digest).decode())), 200
@app.route('/webhook', methods=['POST'])
def webhook_post():
def verify():
signature = flask.request.headers.get('X-Twitter-Webhooks-Signature')
if not signature:
return False
try:
crc = base64.b64decode(signature[7:])
digest = hmac.new(consumer_secret.encode(), msg=flask.request.get_data(), digestmod=hashlib.sha256).digest()
return hmac.compare_digest(digest, crc)
except base64.binascii.Error:
return False
if not verify():
return 'Invalid request signature', 403
push_event(flask.request.get_json())
return '', 200
if __name__ == '__main__':
app.run()
Loading…
Cancel
Save