Commit c1554129 authored by JackV's avatar JackV

Implement MQTT listener

parent 68bc7dfc
......@@ -83,3 +83,15 @@ This is also a valid Python file, but don't abuse that.
## Minimum number of seconds between two successive MAC updates.
mac_update_interval = 0
## IP/DNS name of the MQTT server
mqtt_ip = 127.0.0.1
## Port of the MQTT server
mqtt_port = 1883
## Username to use for MQTT authentication
mqtt_user = 'bits_server'
## Password for MQTT authentication
mqtt_password = ''
......@@ -15,6 +15,7 @@ import bitsd.properties
import bitsd.server as server
import bitsd.listener as listener
import bitsd.mqtt_listener as mqtt_listener
import bitsd.persistence as persistence
from bitsd.common import LOG
......@@ -67,6 +68,7 @@ def main():
persistence.start()
server.start()
listener.start()
mqtt_listener.start()
# Add signal handlers...
signal.signal(signal.SIGTERM, sig_handler)
......
#
# Copyright (C) 2013 Stefano Sanfilippo
# Copyright (C) 2013 BITS development team
#
# This file is part of bitsd, which is released under the terms of
# GNU GPLv3. See COPYING at top level for more information.
#
"""
Listeners are server components waiting for commands on given
ports/hosts/address or events (on the countrary, a server will actually
*serve* content to the client).
"""
from tornado.options import options
import paho.mqtt.client as mqtt_client
from bitsd.common import LOG
from . import hooks
from . import notifier
def on_connect(client, userdata, flags, rc):
if not rc:
client.message_callback_add('sede/status', hooks.handle_status)
notifier.mqtt_instance = client
LOG.info('Connected!')
else:
LOG.error("Connection failed with RC: {}".format(rc))
def start():
__inject_broadcast()
LOG.info('Starting MQTT remote control...')
mqtt_c = mqtt_client.Client(client_id='bits_server',
clean_session=False)
mqtt_c.username_pw_set(options.mqtt_user,
options.mqtt_password)
mqtt_c.on_connect = on_connect
mqtt_c.connect(options.mqtt_ip, int(options.mqtt_port))
mqtt_c.loop_start()
def __inject_broadcast():
"""Lazily load broadcast() function to break circular dependencies"""
from bitsd.server.handlers import broadcast
hooks.broadcast = broadcast
#
# Copyright (C) 2013 Stefano Sanfilippo
# Copyright (C) 2013 BITS development team
#
# This file is part of bitsd, which is released under the terms of
# GNU GPLv3. See COPYING at top level for more information.
#
"""
Hooks called by `.handlers` to handle specific commands.
"""
# NOTE: don't forget to register your handler in RemoteListener.ACTIONS
# : and in __all__ below!!
import json
from bitsd.listener import notifier
from bitsd.persistence.engine import session_scope
from bitsd.persistence.models import Status
import bitsd.persistence.query as query
from bitsd.common import LOG
#: This will be initialized by bitsd.listener.start()
broadcast = None
__all__ = [
'handle_temperature',
'handle_status',
]
status_map = {'open': 1,
'closed': 0}
def handle_temperature(client, userdata, message):
"""Receives and log data received from remote sensor."""
sensorid = message.topic.split('/')[2]
value = message.payload.decode('utf8')
LOG.info('Received temperature: sensorid=%r, value=%r', sensorid, value)
try:
sensorid = int(sensorid)
value = float(value)
except ValueError:
LOG.error('Wrong type for parameters in temperature command!')
return
with session_scope() as session:
temp = query.log_temperature(session, value, sensorid, 'BITS')
broadcast(temp.jsondict())
def handle_status(client, userdata, message):
"""Update status.
Will reject two identical and consecutive updates
(prevents opening when already open and vice-versa)."""
status_msg = json.loads(message.payload.decode('utf8'))
if status_msg['id'] == 'bits_server':
return
LOG.info('Received status: %r', status_msg)
status = status_map(status_msg['status'])
try:
status = int(status)
except ValueError:
LOG.error('Wrong parameter in status command')
return
if status not in (0, 1):
LOG.error('Non existent status %r, ignoring.', status)
return
textstatus = Status.OPEN if status == 1 else Status.CLOSED
with session_scope() as session:
curstatus = query.get_current_status(session)
if curstatus is None or curstatus.value != textstatus:
status = query.log_status(session, textstatus, 'BITS')
broadcast(status.jsondict())
notifier.send_status(textstatus)
else:
LOG.error('BITS already open/closed! Ignoring.')
#
# Copyright (C) 2013 Stefano Sanfilippo
# Copyright (C) 2013 BITS development team
#
# This file is part of bitsd, which is released under the terms of
# GNU GPLv3. See COPYING at top level for more information.
#
"""
Send Web-Triggered status-change to MQTT
"""
import json
from bitsd.persistence.models import Status
from bitsd.common import LOG
mqtt_instance = None
status_rev_map = {0: 'closed',
1: 'open'}
def send_status(value):
"""
Send open or close status to the BITS Fonera.
Status can be either 0 / 1 or Status.CLOSED / Status.OPEN
"""
try:
value = int(value)
except ValueError:
value = 1 if value == Status.OPEN else 0
status = status_rev_map[value]
if mqtt_instance is None:
LOG.info("MQTT not connected yet, ignoring status notification")
else:
jstatus = json.dumps({'id': 'bits_server',
'status': status})
mqtt_instance.publish('sede/status', jstatus, retain=True)
......@@ -147,3 +147,28 @@ define("mac_update_interval",
help="Minimum number of seconds between two successive MAC updates.",
group="Networking"
)
define("mqtt_ip",
default="127.0.0.1",
help="IP/DNS name of the MQTT server",
group="Networking"
)
define("mqtt_port",
default=1883,
help="Port of the MQTT server",
group="Networking"
)
define("mqtt_user",
default="bits_server",
help="Username to use for MQTT authentication",
group="Networking"
)
define("mqtt_password",
default="",
help="Password for MQTT authentication",
group="Networking"
)
......@@ -23,6 +23,7 @@ from tornado.options import options
from tornado.web import MissingArgumentError, HTTPError, RequestHandler
import bitsd.listener.notifier as notifier
import bitsd.mqtt_listener.notifier as mqtt_notifier
import bitsd.persistence.query as query
from bitsd.common import LOG, secure_compare
from bitsd.persistence.engine import session_scope, persist
......@@ -326,6 +327,7 @@ class AdminPageHandler(BaseHandler):
status = query.log_status(session, textstatus, 'web')
broadcast(status.jsondict())
notifier.send_status(textstatus)
mqtt_notifier.send_status(textstatus)
message = "Ora la sede è {}.".format(textstatus == Status.OPEN and "aperta" or "chiusa")
except IntegrityError:
LOG.error("Status changed too quickly, not logged.")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment