diff --git a/comments-api/.gitignore b/comments-api/.gitignore new file mode 100644 index 0000000..6a18ad4 --- /dev/null +++ b/comments-api/.gitignore @@ -0,0 +1,96 @@ +# ---> Python +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + diff --git a/comments-api/Dockerfile b/comments-api/Dockerfile new file mode 100644 index 0000000..3e9833f --- /dev/null +++ b/comments-api/Dockerfile @@ -0,0 +1,14 @@ +FROM python:alpine + +WORKDIR /app +COPY . . + +RUN pip install --no-cache-dir --requirement requirements.txt + +RUN addgroup -g 9999 lilia + +EXPOSE 5000 + +USER nobody:lilia + +ENTRYPOINT ["python", "app.py"] diff --git a/comments-api/app.py b/comments-api/app.py new file mode 100644 index 0000000..23479ff --- /dev/null +++ b/comments-api/app.py @@ -0,0 +1,172 @@ +import logging +import os + +import flask +import flask_apscheduler +import flask_restful +import flask_restful.fields +import flask_restful.reqparse +import sqlalchemy +import sqlalchemy.engine + +from db import db, Video, Comment, Association + + +app = flask.Flask(__name__) +app.logger.setLevel(logging.INFO) +app.config.update( + ERROR_404_HELP=False, + SQLALCHEMY_TRACK_MODIFICATIONS=False, + SQLALCHEMY_DATABASE_URI=os.getenv('SQLALCHEMY_DATABASE_URI'), + SCHEDULER_TIMEZONE='UTC', + SCHEDULER_JOBS=[dict( + id='sync', + func='sync:Sync.perform', + args=(app, db), + max_instances=1, + trigger='interval', + seconds=300)]) + +if app.config.get('SQLALCHEMY_DATABASE_URI', '').startswith('sqlite://'): + @sqlalchemy.event.listens_for(sqlalchemy.engine.Engine, 'connect') + def set_sqlite_pragma(dbapi_connection, connection_record): + dbapi_connection.execute('PRAGMA journal_mode=WAL') + dbapi_connection.execute('PRAGMA synchronous=NORMAL') + +db.init_app(app) +db.create_all(app=app) + +scheduler = flask_apscheduler.APScheduler() +scheduler.init_app(app) + +api = flask_restful.Api(app) + + +video_fields = { + 'id': flask_restful.fields.Integer(), + 'broadcast_type': flask_restful.fields.String(), + 'title': flask_restful.fields.String(), + 'description': flask_restful.fields.String(), + 'game': flask_restful.fields.String(), + 'length': flask_restful.fields.Integer(), + 'thumbnail_small': flask_restful.fields.String(), + 'thumbnail_medium': flask_restful.fields.String(), + 'thumbnail_large': flask_restful.fields.String(), + 'created_at': flask_restful.fields.DateTime(dt_format='iso8601'), + 'updated_at': flask_restful.fields.DateTime(dt_format='iso8601'), + 'recorded_at': flask_restful.fields.DateTime(dt_format='iso8601'), + 'published_at': flask_restful.fields.DateTime(dt_format='iso8601'), +} + +comment_fields = { + 'id': flask_restful.fields.String(attribute='comment.id'), + 'video_id': flask_restful.fields.Integer(), + 'offset': flask_restful.fields.Float(), + 'commenter_id': flask_restful.fields.Integer(attribute='comment.commenter_id'), + 'commenter_name': flask_restful.fields.String(attribute='comment.commenter_name'), + 'commenter_display_name': flask_restful.fields.String(attribute='comment.commenter_display_name'), + 'commenter_logo': flask_restful.fields.String(attribute='comment.commenter_logo'), + 'source': flask_restful.fields.String(attribute='comment.source'), + 'message_body': flask_restful.fields.String(attribute='comment.message_body'), + 'message_user_color': flask_restful.fields.String(attribute='comment.message_user_color'), + 'message_user_badges': flask_restful.fields.String(attribute='comment.message_user_badges'), + 'created_at': flask_restful.fields.DateTime(dt_format='iso8601', attribute='comment.created_at'), + 'updated_at': flask_restful.fields.DateTime(dt_format='iso8601', attribute='comment.updated_at'), +} + + +filter_parser = flask_restful.reqparse.RequestParser() +filter_parser.add_argument('filter', type=str) +filter_parser.add_argument('sort_by', type=str) +filter_parser.add_argument('sort_order', type=str) +filter_parser.add_argument('page_number', type=int) +filter_parser.add_argument('page_size', type=int) + + +class VideoResource(flask_restful.Resource): + @flask_restful.marshal_with(video_fields) + def get(self, id): + q = db.session.query(Video).filter(Video.id == id) + video = q.first() + if not video: + flask_restful.abort(404, message='Video {0} does not exist'.format(id)) + return video, 200 + + +class VideosResource(flask_restful.Resource): + @flask_restful.marshal_with(video_fields) + def get(self): + args = filter_parser.parse_args() + q = db.session.query(Video) + if args['filter']: + q = q.filter(Video.title.ilike('%{}%'.format(args['filter']))) + count = q.count() + if args['sort_order'] == 'random': + q = q.order_by(sqlalchemy.func.random()) + elif args['sort_by']: + col = getattr(Video, args['sort_by'], None) + if col: + if args['sort_order']: + order_by = getattr(col, args['sort_order'], None) + if order_by: + q = q.order_by(order_by()) + else: + q = q.order_by(col) + if args['page_size']: + q = q.limit(args['page_size']) + if args['page_number'] and args['page_size']: + q = q.offset(args['page_number'] * args['page_size']) + videos = q.all() + return videos, 200, {'X-Total-Count': count} + + +class CommentResource(flask_restful.Resource): + @flask_restful.marshal_with(comment_fields) + def get(self, video_id, comment_id): + q = db.session.query(Association).join(Comment).filter( + Association.video_id == video_id, Comment.id == comment_id) + assoc = q.first() + if not assoc: + flask_restful.abort(404, + message='Video {0} or comment {1} does not exist'.format(video_id, comment_id)) + return assoc, 200 + + +class CommentsResource(flask_restful.Resource): + @flask_restful.marshal_with(comment_fields) + def get(self, id): + args = filter_parser.parse_args() + q = db.session.query(Association).join(Comment).filter(Association.video_id == id) + if q.count() == 0: + flask_restful.abort(404, message='Video {0} does not exist'.format(id)) + if args['filter']: + q = q.filter(Comment.message_body.ilike('%{}%'.format(args['filter']))) + count = q.count() + if args['sort_order'] == 'random': + q = q.order_by(sqlalchemy.func.random()) + elif args['sort_by']: + col = getattr(Comment, args['sort_by'], None) + if col: + if args['sort_order']: + order_by = getattr(col, args['sort_order'], None) + if order_by: + q = q.order_by(order_by()) + else: + q = q.order_by(col) + if args['page_size']: + q = q.limit(args['page_size']) + if args['page_number'] and args['page_size']: + q = q.offset(args['page_number'] * args['page_size']) + assocs = q.all() + return assocs, 200, {'X-Total-Count': count} + + +api.add_resource(VideoResource, '/videos/') +api.add_resource(VideosResource, '/videos') +api.add_resource(CommentResource, '/videos//comments/') +api.add_resource(CommentsResource, '/videos//comments') + + +if __name__ == '__main__': + scheduler.start() + app.run(host='0.0.0.0', threaded=True, debug=False) diff --git a/comments-api/db.py b/comments-api/db.py new file mode 100644 index 0000000..3ae2b7f --- /dev/null +++ b/comments-api/db.py @@ -0,0 +1,48 @@ +import flask_sqlalchemy + + +db = flask_sqlalchemy.SQLAlchemy() + + +class Video(db.Model): + __tablename__ = 'videos' + + id = db.Column(db.Integer, primary_key=True) + broadcast_type = db.Column(db.String) + title = db.Column(db.String) + description = db.Column(db.String) + game = db.Column(db.String) + length = db.Column(db.Integer) + thumbnail_small = db.Column(db.String) + thumbnail_medium = db.Column(db.String) + thumbnail_large = db.Column(db.String) + created_at = db.Column(db.DateTime) + updated_at = db.Column(db.DateTime) + recorded_at = db.Column(db.DateTime) + published_at = db.Column(db.DateTime) + associations = db.relationship('Association') + + +class Comment(db.Model): + __tablename__ = 'comments' + + id = db.Column(db.String, primary_key=True) + commenter_id = db.Column(db.Integer) + commenter_name = db.Column(db.String) + commenter_display_name = db.Column(db.String) + commenter_logo = db.Column(db.String) + source = db.Column(db.String) + message_body = db.Column(db.String) + message_user_color = db.Column(db.String) + message_user_badges = db.Column(db.String) + created_at = db.Column(db.DateTime) + updated_at = db.Column(db.DateTime) + + +class Association(db.Model): + __tablename__ = 'association' + + video_id = db.Column(db.Integer, db.ForeignKey('videos.id'), primary_key=True) + comment_id = db.Column(db.String, db.ForeignKey('comments.id'), primary_key=True) + offset = db.Column(db.Float) + comment = db.relationship('Comment') diff --git a/comments-api/requirements.txt b/comments-api/requirements.txt new file mode 100644 index 0000000..eeacc1c --- /dev/null +++ b/comments-api/requirements.txt @@ -0,0 +1,5 @@ +Flask +Flask-APScheduler +Flask-RESTful +Flask-SQLAlchemy +requests-futures diff --git a/comments-api/sync.py b/comments-api/sync.py new file mode 100644 index 0000000..6d74365 --- /dev/null +++ b/comments-api/sync.py @@ -0,0 +1,98 @@ +import datetime +import os + +import flask_restful.inputs + +from db import Video, Comment, Association +from twitch import Twitch + + +class Sync(object): + @staticmethod + def _get(d, *keys, default=None): + try: + result = None + for key in keys: + if result: + if isinstance(result, list): + result = result[key] + else: + result = result.get(key, default) + else: + result = d.get(key, default) + return result + except (KeyError, IndexError): + return default + + @staticmethod + def _to_datetime(val): + if not val: + return None + result = flask_restful.inputs.datetime_from_iso8601(val) + return result.astimezone(tz=datetime.timezone.utc).replace(tzinfo=None) + + @classmethod + def perform(cls, app, db): + app.logger.info('Starting synchronization') + with app.app_context(): + twitch = Twitch(os.getenv('TWITCH_CLIENT_ID')) + updated = [] + for vid in twitch.fetch_videos(os.getenv('TWITCH_CHANNEL_ID')): + id = cls._get(vid, '_id', default='').lstrip('v') + if not id: + continue + if cls._get(vid, 'status') == 'recording': + continue + q = db.session.query(Video).filter(Video.id == id) + video = q.first() + if not video: + video = Video(id=id) + created_at = cls._get(vid, 'created_at') + updated_at = cls._to_datetime(cls._get(vid, 'updated_at', default=created_at)) + created_at = cls._to_datetime(created_at) + if video.updated_at and video.updated_at >= updated_at: + continue + updated.append(id) + video.broadcast_type = cls._get(vid, 'broadcast_type') + video.title = cls._get(vid, 'title') + video.description = cls._get(vid, 'description') + video.game = cls._get(vid, 'game') + video.length = cls._get(vid, 'length') + video.thumbnail_small = cls._get(vid, 'thumbnails', 'small', 0, 'url') + video.thumbnail_medium = cls._get(vid, 'thumbnails', 'medium', 0, 'url') + video.thumbnail_large = cls._get(vid, 'thumbnails', 'large', 0, 'url') + video.created_at = created_at + video.updated_at = updated_at + video.recorded_at = cls._to_datetime(cls._get(vid, 'recorded_at')) + video.published_at = cls._to_datetime(cls._get(vid, 'published_at')) + db.session.add(video) + db.session.commit() + app.logger.info('Updated videos: %s', ', '.join(updated) if updated else 'NONE') + for com in twitch.fetch_comments(updated): + q = db.session.query(Video).filter(Video.id == cls._get(com, 'content_id')) + video = q.first() + if not video: + continue + id = cls._get(com, '_id') + q = db.session.query(Association).filter(Video.id == video.id, Comment.id == id) + assoc = q.first() + if not assoc: + assoc = Association(comment=Comment(id=id)) + assoc.offset=cls._get(com, 'content_offset_seconds') + assoc.comment.commenter_id = cls._get(com, 'commenter', '_id') + assoc.comment.commenter_name = cls._get(com, 'commenter', 'name') + assoc.comment.commenter_display_name = cls._get(com, 'commenter', 'display_name') + assoc.comment.commenter_logo = cls._get(com, 'commenter', 'logo') + assoc.comment.source = cls._get(com, 'source') + assoc.comment.message_body = cls._get(com, 'message', 'body') + assoc.comment.message_user_color = cls._get(com, 'message', 'user_color') + badges = cls._get(com, 'message', 'user_badges') + if badges: + badges = ','.join(['{_id}:{version}'.format(**b) for b in badges]) + assoc.comment.message_user_badges = badges + assoc.comment.created_at = cls._to_datetime(cls._get(com, 'created_at')) + assoc.comment.updated_at = cls._to_datetime(cls._get(com, 'updated_at')) + video.associations.append(assoc) + db.session.add(video) + db.session.commit() + app.logger.info('Synchronization completed') diff --git a/comments-api/twitch.py b/comments-api/twitch.py new file mode 100644 index 0000000..d8501f8 --- /dev/null +++ b/comments-api/twitch.py @@ -0,0 +1,56 @@ +from requests_futures.sessions import FuturesSession + + +class Twitch(object): + def __init__(self, client_id): + self.client_id = client_id + + def fetch_videos(self, channel_id): + if not channel_id: + return [] + session = FuturesSession() + def get_videos(offset, limit): + url = 'https://api.twitch.tv/v5/channels/{0}/videos'.format(channel_id) + params = dict( + client_id=self.client_id, + offset=offset, + limit=limit) + return session.get(url, params=params) + request = get_videos(0, 1) + total = request.result().json().get('_total') + if not total: + return [] + requests = [] + limit = 100 + for offset in range(0, total, limit): + requests.append(get_videos(offset, limit)) + result = [] + for request in requests: + result.extend(request.result().json().get('videos', [])) + return result + + def fetch_comments(self, videos): + if not videos: + return [] + session = FuturesSession(max_workers=len(videos)) + def get_comments(video_id, cursor): + url = 'https://api.twitch.tv/v5/videos/{0}/comments'.format(video_id) + params = dict( + client_id=self.client_id, + cursor=cursor) + return session.get(url, params=params) + pairs = [(video_id, '') for video_id in videos] + result = [] + while pairs: + requests = [] + for video_id, cursor in pairs: + requests.append(get_comments(video_id, cursor)) + videos, _ = zip(*pairs) + pairs = [] + for video_id, request in zip(videos, requests): + data = request.result().json() + result.extend(data.get('comments', [])) + cursor = data.get('_next') + if cursor: + pairs.append((video_id, cursor)) + return result