Add twitch-webhooks service

master
Nikola Forró 5 years ago
parent 67a1186cfb
commit 7c891f18cf

@ -16,6 +16,7 @@ services:
- teespring-api
- twitch-cache-api
- twitch-subs-api
- twitch-webhooks
- cms
# Cheese Horde members API service with /data/horde-members mounted as database storage
@ -100,6 +101,23 @@ services:
expose:
- 5000
# Twitch webhooks service with /data/twitch-webhooks mounted as data storage
# TWITCH_CLIENT_ID, TWITCH_CHANNEL_ID and CALLBACK_URL
# are needed for Twitch API access and synchronization
twitch-webhooks:
build:
context: ./twitch-webhooks
volumes:
- /data/twitch-webhooks:/twitch-webhooks
environment:
- LEASE_FILE=/twitch-webhooks/subscriptions.json
- CONSUMER_ADDRESS=twitch-events:2828
- TWITCH_CLIENT_ID=__TWITCH_CLIENT_ID__
- TWITCH_CHANNEL_ID=__TWITCH_CHANNEL_ID__
- CALLBACK_URL=__CALLBACK_URL__
expose:
- 5000
# CMS service with /data/grav mounted as user data storage
cms:
build:

@ -128,6 +128,14 @@ http {
proxy_pass http://twitch-subs-api:5000;
}
location ^~ /twitch-webhooks/ {
rewrite ^/twitch-webhooks(/.*)$ $1 break;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_http_version 1.1;
proxy_pass http://twitch-webhooks:5000;
}
location ^~ /schedule {
return 301 /#livestreams;
}

@ -0,0 +1,96 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject

@ -0,0 +1,14 @@
FROM python:alpine
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir --requirement requirements.txt
RUN addgroup -g 9999 lilia
EXPOSE 5000
USER nobody:lilia
ENTRYPOINT ["python", "app.py"]

@ -0,0 +1,81 @@
import datetime
import hashlib
import hmac
import json
import logging
import os
import flask
import flask_apscheduler
import flask_restful
import flask_restful.reqparse
from sender import Sender
subscriptions = {}
app = flask.Flask(__name__)
app.logger.setLevel(logging.INFO)
app.config.update(
ERROR_404_HELP=False,
SCHEDULER_TIMEZONE='UTC',
SCHEDULER_JOBS=[
dict(id='subscribe',
func='twitch:Twitch.subscribe',
args=(app, subscriptions, 86400),
max_instances=1,
trigger='interval',
seconds=3600,
next_run_time=datetime.datetime.utcnow() + datetime.timedelta(seconds=10))])
scheduler = flask_apscheduler.APScheduler()
scheduler.init_app(app)
api = flask_restful.Api(app)
sender = Sender(app)
verification_parser = flask_restful.reqparse.RequestParser()
verification_parser.add_argument('hub.challenge', type=str, required=True)
verification_parser.add_argument('hub.lease_seconds', type=int, required=True)
verification_parser.add_argument('hub.mode', type=str, required=True)
verification_parser.add_argument('hub.topic', type=str, required=True)
class WebhooksResource(flask_restful.Resource):
def get(self):
args = verification_parser.parse_args()
verified = False
for sub in subscriptions.values():
h = '{0[hub.mode]}|{0[hub.topic]}|{0[hub.lease_seconds]}'.format(args)
if hashlib.sha256(h.encode()).hexdigest() == sub['request_hash']:
verified = True
break
if not verified:
flask_restful.abort(400, message='Verification failed')
r = app.make_response(args['hub.challenge'])
r.mimetype = 'text/plain'
r.status_code = 200
return r
def post(self):
sha, signature = flask.request.headers.get('X-Hub-Signature').split('=')
data = flask.request.data
for topic, sub in subscriptions.items():
mac = hmac.new(sub['secret'].encode(), data, getattr(hashlib, sha))
if not hmac.compare_digest(mac.hexdigest(), signature):
continue
message = flask.request.get_json(force=True)
message['topic'] = topic
sender.send(json.dumps(message).encode())
return None, 204
api.add_resource(WebhooksResource, '/webhooks')
if __name__ == '__main__':
scheduler.start()
app.run(host='0.0.0.0', threaded=True, debug=False)

@ -0,0 +1,5 @@
Flask
Flask-APScheduler
Flask-RESTful
python-dateutil
requests-futures

@ -0,0 +1,20 @@
import os
import socket
class Sender(object):
def __init__(self, app):
self.app = app
addr, port = os.getenv('CONSUMER_ADDRESS').split(':')
self.address = (addr, int(port))
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
def __del__(self):
self.socket.close()
def send(self, message):
try:
self.socket.sendto(message, self.address)
except Exception as e:
with self.app.app_context():
self.app.logger.error('Failed to send message: %s', str(e))

@ -0,0 +1,77 @@
import datetime
import hashlib
import json
import os
import random
import string
import dateutil.parser
from requests_futures.sessions import FuturesSession
TOPIC_URLS = {
'streams': 'https://api.twitch.tv/helix/streams?user_id={0}',
'followers': 'https://api.twitch.tv/helix/users/follows?first=1&to_id={0}',
}
class Twitch(object):
@classmethod
def subscribe(cls, app, subscriptions, lease_time):
def read_leases(lease_file):
result = dict(
streams=dict(secret=None, valid_until=None, request_hash=None),
followers=dict(secret=None, valid_until=None, request_hash=None))
try:
with open(lease_file) as f:
leases = json.load(f)
except IOError:
return result
else:
result.update(leases)
return result
def write_leases(lease_file, leases):
with open(lease_file, 'w') as f:
json.dump(leases, f, indent=4, sort_keys=True)
def generate_secret(length):
pool = string.ascii_letters + string.digits
return ''.join(random.SystemRandom().choice(pool) for _ in range(length))
def sub(topic, secret):
client_id = os.getenv('TWITCH_CLIENT_ID')
channel_id = os.getenv('TWITCH_CHANNEL_ID')
callback_url = os.getenv('CALLBACK_URL')
session = FuturesSession()
url = 'https://api.twitch.tv/helix/webhooks/hub'
headers = {'Client-ID': client_id}
data = {
'hub.mode': 'subscribe',
'hub.topic': TOPIC_URLS[topic].format(channel_id),
'hub.callback': callback_url,
'hub.lease_seconds': lease_time,
'hub.secret': secret,
}
h = '{0[hub.mode]}|{0[hub.topic]}|{0[hub.lease_seconds]}'.format(data)
request_hash = hashlib.sha256(h.encode()).hexdigest()
return session.post(url, headers=headers, data=data), request_hash
with app.app_context():
app.logger.info('Refreshing Twitch subscriptions')
lease_file = os.getenv('LEASE_FILE')
leases = read_leases(lease_file)
for topic, lease in leases.items():
if not lease['secret'] or not lease['valid_until']:
remaining = 0
if lease['valid_until']:
valid_until = dateutil.parser.parse(lease['valid_until'])
remaining = (valid_until - datetime.datetime.utcnow()).total_seconds()
if remaining < 3600:
app.logger.info('Subscription "%s" expired, re-requesting', topic)
secret = generate_secret(32)
valid_until = datetime.datetime.utcnow() + datetime.timedelta(seconds=lease_time)
request, request_hash = sub(topic, secret)
if request.result().ok:
lease['secret'] = secret
lease['valid_until'] = valid_until.isoformat()
lease['request_hash'] = request_hash
write_leases(lease_file, leases)
subscriptions.update(leases)
Loading…
Cancel
Save