Complete revision of the project

parent 6329b7da
from machine import Timer
class Task(Timer):
def __init__(self, period: int, task):
super().__init__(-1)
self._period = period
self._task = task
self._timer = Timer(-1)
self.init(period=self._period, mode=Timer.PERIODIC, callback=self._task)
......@@ -37,7 +37,7 @@ class Board:
return wrapper
def __init__(self, callback: "Callable[None,None]" = None):
def __init__(self, callback_button: "Callable[None,None]" = None):
self._red = Pin(self.PIN_LED_RED, Pin.OUT)
self._green = Pin(self.PIN_LED_GREEN, Pin.OUT)
self._blue = Pin(self.PIN_LED_BLUE, Pin.OUT)
......@@ -46,15 +46,13 @@ class Board:
self._green.on()
self._blue.on()
self._callback = callback
self._armed = True
self._callback_button = callback_button
self._button = Pin(self.PIN_BUTTON, Pin.IN)
self._button.irq(trigger=Pin.IRQ_RISING, handler=self._button_pressed)
def setLED(self, color: int):
def set_led(self, color: int):
if color is self.LED_RED:
self._red.off()
self._green.on()
......@@ -70,4 +68,5 @@ class Board:
@Decorator.Debounce(2)
def _button_pressed(self, _):
self._callback()
if self._callback_button is not None:
self._callback_button()
import gc
import machine
import micropython
import ubinascii
import ujson
from umqtt.robust import MQTTClient
from umqtt.simple import MQTTClient
from utime import sleep_ms
import config
import interfaces
import interrupt
from Task import Task
from board import Board
_status = False
micropython.alloc_emergency_exception_buf(100)
_status = None
def _updates_led(_):
if _status is None:
board.set_led(board.LED_BLUE)
elif _status:
board.set_led(board.LED_GREEN)
else:
board.set_led(board.LED_RED)
def _hq_changed_status(_, msg):
......@@ -20,27 +33,34 @@ def _hq_changed_status(_, msg):
print("The HQs status was changed to {}".format(data["status"]))
if data["status"] == True:
_status = True
board.setLED(board.LED_GREEN)
else:
_status = False
board.setLED(board.LED_RED)
def _hq_changes_status():
if not interfaces.sta.isConnected():
return
global _status
data = {
"id": config.BUTTON_ID,
"status": not _status
}
print("The HQs status will be changed to {}".format(data["status"]))
mqtt.publish(config.MQTT_TOPIC, ujson.dumps(data), retain=True)
try:
mqtt.publish(config.MQTT_TOPIC, ujson.dumps(data), retain=True)
print("The HQs status will be changed to {}".format(data["status"]))
except:
print("Impossible to publish a status change")
_status = None
board = Board(callback=_hq_changes_status)
# Create interface to the board
board = Board(callback_button=_hq_changes_status)
# Create task to update the LED
task_update_status = Task(period=500, task=_updates_led)
# Create an instance to connect to mqtt server
mqtt = MQTTClient(
ubinascii.hexlify(machine.unique_id()),
config.MQTT_SERVER,
......@@ -48,26 +68,74 @@ mqtt = MQTTClient(
password=config.MQTT_PASSWORD
)
# Set the callback to the changes of the mqtt's topic
mqtt.set_callback(_hq_changed_status)
while interrupt.isRunning():
if not interfaces.sta.isConnected():
board.setLED(board.LED_BLUE)
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
continue
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
_status = None
print("Failed to connect to Wi-Fi")
continue
try:
mqtt.connect()
mqtt.subscribe(config.MQTT_TOPIC)
print("Connected to mqtt server")
except Exception as e:
print("Error to connect to mqtt server")
print("> {}".format(e))
continue
mqtt.connect()
mqtt.subscribe(config.MQTT_TOPIC)
break
while interrupt.isRunning():
def try_to_connect():
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
print("Failed to connect to Wi-Fi")
return False
if _status is None:
try:
print("Try to reconnect to mqtt server")
mqtt.connect()
print("Try to re-subscribe to mqtt topic")
mqtt.subscribe(config.MQTT_TOPIC)
print("Reconnected to mqtt server")
except Exception as e:
print("Error to reconnect to mqtt server")
print("> {}".format(e))
return False
return True
def work(_):
global _status
gc.collect()
print("#Free memory {}Byte".format(gc.mem_free()))
if not interfaces.sta.isConnected():
board.setLED(board.LED_BLUE)
if not interfaces.sta.try_to_connect(config.WIFI_SSID, config.WIFI_KEY):
continue
try:
print("Try to retrieve a mqtt message")
mqtt.check_msg()
except Exception as e:
_status = None
print("Error in retrieving of mqtt message")
print("> {}".format(e))
mqtt.check_msg()
if not try_to_connect():
return
task_work = Task(period=2000, task=work)
while interrupt.isRunning():
sleep_ms(500)
task_update_status.deinit()
task_work.deinit()
board._red.off()
board._green.off()
board._blue.off()
import upip
upip.install("micropython-functools")
upip.install("micropython-umqtt.robust")
upip.install("micropython-umqtt.simple")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment