import base64 import configparser import hashlib import hmac import json import logging import pathlib import flask import persistqueue import tweepy from twitivity import Twitivity, TwitivityError config = configparser.ConfigParser() config.read('settings.cfg') app = flask.Flask(__name__) app.logger.setLevel(logging.INFO) app.config.update( SERVER_NAME=config.get('Flask', 'server_name'), SECRET_KEY=config.get('Flask', 'secret_key'), ERROR_404_HELP=False, JSONIFY_PRETTYPRINT_REGULAR=True ) storage_path = config.get('General', 'storage_path') consumer_key = config.get('Twitter', 'consumer_key') consumer_secret = config.get('Twitter', 'consumer_secret') callback_url = config.get('Twitter', 'callback_url') webhook_url = config.get('Twitter', 'webhook_url') environment_name = config.get('Twitter', 'environment_name') def update_tokens(user_id, access_token): path = pathlib.Path(storage_path, 'tokens.json') path.touch(exist_ok=True) with open(path, 'r+') as f: try: tokens = json.load(f) except json.decoder.JSONDecodeError: tokens = dict() tokens.update({user_id: access_token}) f.seek(0) json.dump(tokens, f, sort_keys=True, indent=4) f.truncate() def push_event(data): queue = persistqueue.SQLiteQueue(storage_path) queue.put(data) queue.task_done() def setup_webhook(api): twitivity = Twitivity(api, environment_name) try: webhook_id = twitivity.check_webhook() except TwitivityError: webhook_id = None if not webhook_id: try: twitivity.register_webhook(webhook_url) except TwitivityError as e: app.logger.error(str(e)) return False try: twitivity.check_subscription() except TwitivityError: try: twitivity.subscribe() except TwitivityError as e: app.logger.error(str(e)) return False return True @app.route('/auth') def auth(): handler = tweepy.OAuthHandler(consumer_key, consumer_secret, callback_url) url = handler.get_authorization_url() flask.session['request_token'] = handler.request_token return flask.redirect(url) @app.route('/callback') def callback(): request_token = flask.session['request_token'] del flask.session['request_token'] handler = tweepy.OAuthHandler(consumer_key, consumer_secret, callback_url) handler.request_token = request_token try: handler.get_access_token(flask.request.args.get('oauth_verifier')) except tweepy.TweepError: return 'Authentication failure', 500 api = tweepy.API(handler) user = api.verify_credentials(include_entities=False, skip_status=True, include_email=False) if not user: return 'Authentication failure', 500 update_tokens(user.id_str, [handler.access_token, handler.access_token_secret]) if not setup_webhook(api): return 'Webhook setup failure', 500 return 'Success', 200 @app.route('/webhook', methods=['GET']) def webhook_get(): crc = flask.request.args.get('crc_token') if not crc: return 'Invalid request', 403 digest = hmac.new(consumer_secret.encode(), msg=crc.encode(), digestmod=hashlib.sha256).digest() return dict(response_token='sha256={0}'.format(base64.b64encode(digest).decode())), 200 @app.route('/webhook', methods=['POST']) def webhook_post(): def verify(): signature = flask.request.headers.get('X-Twitter-Webhooks-Signature') if not signature: return False try: crc = base64.b64decode(signature[7:]) digest = hmac.new(consumer_secret.encode(), msg=flask.request.get_data(), digestmod=hashlib.sha256).digest() return hmac.compare_digest(digest, crc) except base64.binascii.Error: return False if not verify(): return 'Invalid request signature', 403 push_event(flask.request.get_json()) return '', 200 if __name__ == '__main__': app.run()