Deep restyling of files

parent d67b43a5
......@@ -9,6 +9,15 @@ class Task(Timer):
self._task = task
self._timer = Timer(-1)
def init(self):
super().init(period=self._period, mode=Timer.PERIODIC, callback=self._task)
self.init(period=self._period, mode=Timer.PERIODIC, callback=self._task)
class TasksList(dict):
def init_all(self):
for _, i in self.items():
i.init()
def deinit_all(self):
for _, i in self.items():
i.deinit()
......@@ -14,6 +14,7 @@ class Board:
LED_RED = 0
LED_GREEN = 1
LED_BLUE = 2
LED_WHITE = 3
class Decorator:
class Debounce:
......@@ -65,6 +66,10 @@ class Board:
self._red.on()
self._green.on()
self._blue.off()
elif color is self.LED_WHITE:
self._red.off()
self._green.off()
self._blue.off()
@Decorator.Debounce(2)
def _button_pressed(self, _):
......
import ujson
import config
import env
def _hq_changed_status(_, msg):
data = ujson.loads(msg)
print("The HQs status was changed to {}".format(data["status"]))
if data["status"] == True:
env.status = True
else:
env.status = False
def _hq_changes_status():
data = {
"id": config.BUTTON_ID,
"status": not env.status
}
try:
env.mqtt.publish(config.MQTT_TOPIC, ujson.dumps(data), retain=True)
print("The HQs status will be changed to {}".format(data["status"]))
except:
print("Impossible to publish a status change")
env.status = None
status = None
board = None
mqtt = None
tasks_list = None
\ No newline at end of file
import machine
import ubinascii
from umqtt.simple import MQTTClient
import config
import env
from Task import TasksList, Task
from board import Board
from callbacks import _hq_changed_status, _hq_changes_status
from tasks import _updates_led, _stay_connected, _work
def init():
env.status = None
# Create interface to the board
env.board = Board(callback_button=_hq_changes_status)
# Create an instance to connect to mqtt server
env.mqtt = MQTTClient(
ubinascii.hexlify(machine.unique_id()),
config.MQTT_SERVER,
user=config.MQTT_USER,
password=config.MQTT_PASSWORD
)
# Set the callback to the changes of the mqtt's topic
env.mqtt.set_callback(_hq_changed_status)
# Create a list of tasks
env.tasks_list = TasksList()
# Create task to update the LED
env.tasks_list["update_status"] = Task(period=500, task=_updates_led)
# Create task to hold the connection
env.tasks_list["stay_connected"] = Task(period=5000, task=_stay_connected)
# Create the main task
env.tasks_list["work"] = Task(period=2000, task=_work)
import gc
import machine
import micropython
import ubinascii
import ujson
from umqtt.simple import MQTTClient
from utime import sleep_ms
import config
import interfaces
import env
import interrupt
from Task import Task
from board import Board
from init import init
micropython.alloc_emergency_exception_buf(100)
_status = None
def _updates_led(_):
if _status is None:
board.set_led(board.LED_BLUE)
elif _status:
board.set_led(board.LED_GREEN)
else:
board.set_led(board.LED_RED)
def _hq_changed_status(_, msg):
global _status
data = ujson.loads(msg)
print("The HQs status was changed to {}".format(data["status"]))
if data["status"] == True:
_status = True
else:
_status = False
def _hq_changes_status():
global _status
data = {
"id": config.BUTTON_ID,
"status": not _status
}
try:
mqtt.publish(config.MQTT_TOPIC, ujson.dumps(data), retain=True)
print("The HQs status will be changed to {}".format(data["status"]))
except:
print("Impossible to publish a status change")
_status = None
# Create interface to the board
board = Board(callback_button=_hq_changes_status)
# Create task to update the LED
task_update_status = Task(period=500, task=_updates_led)
# Create an instance to connect to mqtt server
mqtt = MQTTClient(
ubinascii.hexlify(machine.unique_id()),
config.MQTT_SERVER,
user=config.MQTT_USER,
password=config.MQTT_PASSWORD
)
# Set the callback to the changes of the mqtt's topic
mqtt.set_callback(_hq_changed_status)
while interrupt.isRunning():
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
_status = None
print("Failed to connect to Wi-Fi")
continue
try:
mqtt.connect()
mqtt.subscribe(config.MQTT_TOPIC)
print("Connected to mqtt server")
except Exception as e:
print("Error to connect to mqtt server")
print("> {}".format(e))
continue
break
def try_to_connect():
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
print("Failed to connect to Wi-Fi")
return False
if _status is None:
try:
print("Try to reconnect to mqtt server")
mqtt.connect()
print("Try to re-subscribe to mqtt topic")
mqtt.subscribe(config.MQTT_TOPIC)
print("Reconnected to mqtt server")
except Exception as e:
print("Error to reconnect to mqtt server")
print("> {}".format(e))
return False
return True
def work(_):
global _status
gc.collect()
print("#Free memory {}Byte".format(gc.mem_free()))
try:
print("Try to retrieve a mqtt message")
mqtt.check_msg()
except Exception as e:
_status = None
print("Error in retrieving of mqtt message")
print("> {}".format(e))
if not try_to_connect():
return
init()
# tasks_list["update_status"].init()
env.tasks_list.init_all()
# while interrupt.isRunning():
# if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
# _status = None
# print("Failed to connect to Wi-Fi")
# continue
#
# try:
# mqtt.connect()
# mqtt.subscribe(config.MQTT_TOPIC)
# print("Connected to mqtt server")
#
# except Exception as e:
# print("Error to connect to mqtt server")
# print("> {}".format(e))
# continue
#
# break
task_work = Task(period=2000, task=work)
while interrupt.isRunning():
sleep_ms(500)
task_update_status.deinit()
task_work.deinit()
env.tasks_list.deinit_all()
board._red.off()
board._green.off()
board._blue.off()
env.board.set_led(Board.LED_WHITE)
import gc
import config
import env
import interfaces
def _updates_led(_):
if env.status is None:
env.board.set_led(env.board.LED_BLUE)
elif env.status:
env.board.set_led(env.board.LED_GREEN)
else:
env.board.set_led(env.board.LED_RED)
def _stay_connected(_):
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
print("Failed to connect to Wi-Fi")
return False
if env.status is None:
try:
print("Try to reconnect to mqtt server")
env.mqtt.connect()
print("Try to re-subscribe to mqtt topic")
env.mqtt.subscribe(config.MQTT_TOPIC)
print("Reconnected to mqtt server")
except Exception as e:
print("Error to reconnect to mqtt server")
print("> {}".format(e))
return False
return True
def _work(_):
gc.collect()
print("#Free memory {}Byte".format(gc.mem_free()))
try:
print("Try to retrieve a mqtt message")
env.mqtt.check_msg()
except Exception as e:
status = None
print("Error in retrieving of mqtt message")
print("> {}".format(e))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment