You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

56 lines
1.6 KiB

import asyncio
import datetime
import http
import os
import itsdangerous
import websockets
clients = set()
queue = asyncio.Queue()
class Protocol(asyncio.DatagramProtocol):
def datagram_received(self, data, addr):
queue.put_nowait(data)
async def serve(websocket, path):
async def send(ws, message):
try:
await ws.send(message)
except websockets.exceptions.ConnectionClosed:
pass
print(datetime.datetime.utcnow().isoformat(), 'Connection from', websocket.remote_address)
clients.add(websocket)
try:
while True:
message = (await queue.get()).strip().decode()
print(datetime.datetime.utcnow().isoformat(), 'Sending', message)
await asyncio.wait([send(ws, message) for ws in clients])
finally:
clients.remove(websocket)
async def process_request(path, request_headers):
error = (http.HTTPStatus.UNAUTHORIZED, {}, bytes())
key = request_headers.get('X-Events-Key')
if not key:
return error
s = itsdangerous.TimedJSONWebSignatureSerializer(os.getenv('SECRET_KEY'))
try:
s.loads(key)
except (itsdangerous.SignatureExpired, itsdangerous.BadSignature):
return error
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start_consumer = loop.create_datagram_endpoint(Protocol, local_addr=('0.0.0.0', 2828))
loop.run_until_complete(start_consumer)
start_server = websockets.serve(serve, '0.0.0.0', 8765, process_request=process_request)
loop.run_until_complete(start_server)
loop.run_forever()