import asyncio import datetime import http import os import itsdangerous import websockets clients = set() queue = asyncio.Queue() class Protocol(asyncio.DatagramProtocol): def datagram_received(self, data, addr): queue.put_nowait(data) async def serve(websocket, path): async def send(ws, message): try: await ws.send(message) except websockets.exceptions.ConnectionClosed: pass print(datetime.datetime.utcnow().isoformat(), 'Connection from', websocket.remote_address) clients.add(websocket) try: while True: message = (await queue.get()).strip().decode() print(datetime.datetime.utcnow().isoformat(), 'Sending', message) await asyncio.wait([send(ws, message) for ws in clients]) finally: clients.remove(websocket) async def process_request(path, request_headers): error = (http.HTTPStatus.UNAUTHORIZED, {}, bytes()) key = request_headers.get('X-Events-Key') if not key: return error s = itsdangerous.TimedJSONWebSignatureSerializer(os.getenv('SECRET_KEY')) try: s.loads(key) except (itsdangerous.SignatureExpired, itsdangerous.BadSignature): return error if __name__ == '__main__': loop = asyncio.get_event_loop() start_consumer = loop.create_datagram_endpoint(Protocol, local_addr=('0.0.0.0', 2828)) loop.run_until_complete(start_consumer) start_server = websockets.serve(serve, '0.0.0.0', 8765, process_request=process_request) loop.run_until_complete(start_server) loop.run_forever()