Files
Tevolve2MQTT/src/pytevolve.py
2025-03-23 12:59:17 +00:00

236 lines
7.6 KiB
Python

import requests
import json
import threading
import websocket
import time
data = "username=1202283%40uad.ac.uk&password=24e76d8e4&grant_type=password"
url = "https://api-tevolve.termoweb.net/client/token"
headers = {'content-type': 'application/x-www-form-urlencoded',
'authorization': 'Basic NWM0OWRjZTk3NzUxMDM1MTUwNmM0MmRiOnRldm9sdmU='}
dev_id = "082e858131ff012c51"
class Tevolve:
setmode = ""
setTemperature = ""
token = None
sid = ""
websocket_url = ""
mode = ""
set_point = ""
websocket_message = ""
hallway_mode = ""
hallway_current_temp = ""
hallway_set_temp = ""
hallway_active = ""
livingroom_mode = ""
livingroom_current_temp = ""
livingroom_set_temp = ""
livingroom_active = ""
@staticmethod
def token_manager():
while Tevolve.token is None:
x = requests.post(url, headers=headers, data=data)
print(x)
if x.status_code == 200:
Tevolve.token = x.json()["access_token"]
else:
time.sleep(30)
@staticmethod
def get_sid():
time.sleep(5)
url = "https://api-tevolve.termoweb.net/socket.io/?token=" + Tevolve.token + "&dev_id=082e858131ff012c51&EIO" \
"=3&transport=polling"
payload = {}
response = requests.request("GET", url, data=payload)
if response.status_code == 200:
temp = response.text[5:]
temp = json.loads(temp)
Tevolve.sid = temp['sid']
else:
Tevolve.token = None
Tevolve.token_manager()
@staticmethod
def post_websocket():
Tevolve.token_manager()
Tevolve.get_sid()
url = "https://api-tevolve.termoweb.net/socket.io/?token=" + Tevolve.token + "&dev_id=082e858131ff012c51&EIO" \
"=3&transport=polling&t=Ntb6xXn" \
"&sid=" + Tevolve.sid
payload = "401:40/api/v2/socket_io?token=" + Tevolve.token + "&dev_id=082e858131ff012c51"
headers = {
'Cookie': "io=" + Tevolve.sid,
'Content-Type': 'text/plain',
'Connection': 'keep-alive'
}
requests.request("POST", url, headers=headers, data=payload)
@staticmethod
def create_websocket():
time.sleep(5)
websocket_url = "wss://api-tevolve.termoweb.net/socket.io/?token=" + Tevolve.token + "&dev_id=082e858131ff012c51&EIO=3&transport=websocket&sid=" + Tevolve.sid + ""
def on_message(ws, message):
Tevolve.websocket_message = message
def on_open(ws):
def run():
print("websocket (run)")
counter = 0
ws.send("2probe")
time.sleep(2)
ws.send(str("5"))
while 1:
ws.send("2")
counter = counter + 1
if counter >= 360:
break
time.sleep(10)
r = threading.Thread(target=run, daemon=True)
r.start()
def on_close(ws, close_status_code, close_msg):
print("WEB SOCKET ClOSED")
Tevolve.post_websocket()
Tevolve.create_websocket()
def on_error(ws, err):
print("WEB SOCKET ERROR")
Tevolve.post_websocket()
Tevolve.create_websocket()
ws = websocket.WebSocketApp(websocket_url)
websocket.enableTrace(False)
ws.on_message = on_message
ws.on_open = on_open
ws.on_close = on_close
ws.on_error = on_error
ws.run_forever(ping_interval=10, ping_payload='42/api/v2/socket_io,["message","ping"]')
@staticmethod
def set_mode(heater_id):
heater_id = heater_id
mode = Tevolve.setmode
headers = {
'host': 'api-tevolve.termoweb.net',
'origin': 'https://tevolve.termoweb.net',
'content-type': 'application/json',
'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) '
'Version/15.4 Safari/605.1.15',
'authorization': 'Bearer ' + Tevolve.token,
'referer': 'https://tevolve.termoweb.net/'
}
url = "https://api-tevolve.termoweb.net/api/v2/devs/082e858131ff012c51/htr/" + heater_id + "/status"
if mode == "heat":
mode = "manual"
payload = json.dumps({
"mode": mode
})
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 201 or response.status_code == 200:
pass
@staticmethod
def get_mode(heater_id):
headers = {
'host': 'api-tevolve.termoweb.net',
'origin': 'https://tevolve.termoweb.net',
'content-type': 'application/json',
'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) '
'Version/15.4 Safari/605.1.15',
'authorization': 'Bearer ' + Tevolve.token,
'referer': 'https://tevolve.termoweb.net/',
}
url = "https://api-tevolve.termoweb.net/api/v2/devs/082e858131ff012c51/htr/" + heater_id + "/status"
response = requests.get(url, headers=headers)
if response.status_code == 201 or response.status_code == 200:
mode = response.json()["mode"]
if mode == "manual":
return "heat"
@staticmethod
def get_status(heater_id):
headers = {
'host': 'api-tevolve.termoweb.net',
'origin': 'https://tevolve.termoweb.net',
'content-type': 'application/json',
'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) '
'Version/15.4 Safari/605.1.15',
'authorization': 'Bearer ' + Tevolve.token,
'referer': 'https://tevolve.termoweb.net/',
}
url = "https://api-tevolve.termoweb.net/api/v2/devs/082e858131ff012c51/htr/" + heater_id + "/status"
response = requests.get(url, headers=headers)
if response.status_code == 201 or response.status_code == 200:
return response.json()
@staticmethod
def set_temperature(heater_id, set_temperature):
headers = {
'host': 'api-tevolve.termoweb.net',
'origin': 'https://tevolve.termoweb.net',
'content-type': 'application/json',
'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) '
'Version/15.4 Safari/605.1.15',
'authorization': 'Bearer ' + Tevolve.token,
'referer': 'https://tevolve.termoweb.net/'
}
url = "https://api-tevolve.termoweb.net/api/v2/devs/082e858131ff012c51/htr/" + heater_id + "/status"
payload = json.dumps({
"stemp": str(float(set_temperature)),
"units": "C",
"mode": "manual"
})
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 201 or response.status_code == 200:
pass