-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
69 lines (57 loc) · 2.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# coding=utf-8
import asyncio
import threading
import time
from ha_mqtt_discoverable import Settings
from ha_mqtt_discoverable.sensors import Number, NumberInfo, Sensor, SensorInfo
from paho.mqtt.client import Client, MQTTMessage
from pylgbst import *
from pylgbst.hub import MoveHub
log = logging.getLogger("demo")
def set_power(movehub: MoveHub, numberEntity: Number, power: int):
movehub.motor_A.power(power / 100)
numberEntity.set_value(power)
# Press the green button in the gutter to run the script.
async def connect_to_hub(macAddress: str):
connection = get_connection_bleak(hub_mac=macAddress)
hub = MoveHub(connection)
return hub
async def main():
global hub
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
mac_address = '9C:9A:C0:00:FB:F2'
mqtt_host = ''
mqtt_username = ''
mqtt_password = ''
try:
while True:
try:
hub = await connect_to_hub(mac_address)
break
except Exception as e:
pass
macAsId = mac_address.replace(":", "")
mqtt_settings = Settings.MQTT(host=mqtt_host,
username=mqtt_username,
password=mqtt_password,
client_name='movehub-connect')
# Information about the `number` entity.
number_info = NumberInfo(name="train-power-" + macAsId, min=-100, max=100, mode="slider", step=10)
settings = Settings(mqtt=mqtt_settings, entity=number_info)
def power_update(client: Client, user_data, message: MQTTMessage):
number = int(message.payload.decode())
logging.info(f"Received {number} from HA")
set_power(hub, speed_number, number)
speed_number = Number(settings, lambda client, user_data, message: power_update(client, user_data, message))
speed_number.set_value(0)
battery_info = SensorInfo(name="train-battery-" + macAsId, unique_id="train-battery-" + macAsId, unit_of_measurement="V", value_template="{{ value|float }}")
battery_settings = Settings(mqtt=mqtt_settings, entity=battery_info)
battery_sensor = Sensor(battery_settings)
while True:
battery_sensor.set_state(hub.voltage.voltage)
time.sleep(30)
finally:
if hub:
hub.disconnect()
asyncio.run(main())
# See PyCharm help at https://www.jetbrains.com/help/pycharm/