Commit 83997741 authored by Fred's avatar Fred

Add new file

parent d54e26f3
import websockets
import asyncio
from telegram import Bot
import json
WS_URL = "wss://bits.poul.org/ws"
CHAT_ID = 123 #CHANGE ME
MESSAGE_ID = 123 #CHANGE ME
TOKEN = "KEK"
bot = Bot(TOKEN)
def bits_handler(text):
meme = json.loads(text)
bot.edit_message_text("POuL HQ is now{}".format(meme["status"]["value"]), CHAT_ID, MESSAGE_ID)
return
class WebSocketClient:
def __init__(self, ws_url):
self.ws_url = ws_url
self.connection = None
async def connect(self):
self.connection = await websockets.client.connect(self.ws_url)
if self.connection.open:
return self.connection
async def receiveMessage(self, connection, message_handler):
while True:
try:
message = await connection.recv()
message_handler(message)
except websockets.exceptions.ConnectionClosed:
break
async def heartbeat(self):
while True:
try:
await self.connection.ping()
await asyncio.sleep(5)
except websockets.exceptions.ConnectionClosed:
break
if __name__ == '__main__':
client = WebSocketClient(WS_URL)
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(client.connect())
tasks = [
asyncio.ensure_future(client.heartbeat()),
asyncio.ensure_future(client.receiveMessage(connection, bits_handler)),
]
loop.run_until_complete(asyncio.wait(tasks))
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment