Source code for bspump.mqtt.connection

import paho.mqtt.client as mqtt
import logging

from ..abc.connection import Connection

L = logging.getLogger(__name__)


[docs] class MQTTConnection(Connection): ConfigDefaults = { "broker": "localhost:1883", "keepalive": 60, "username": "", "password": "", }
[docs] def __init__(self, app, id=None, config=None): super().__init__(app, id=id, config=config) self._client = mqtt.Client(transport="websockets") broker = self.Config["broker"] self._host, self._port = broker.split(":") if self._port == "443": # communication between public broker self._client.tls_set(cert_reqs=mqtt.ssl.CERT_REQUIRED) else: # internal communication between broker self._client.tls_set(cert_reqs=mqtt.ssl.CERT_NONE) if self.Config["username"] != "" and self.Config["password"] != "": self._client.username_pw_set( self.Config["username"], self.Config["password"] ) self._client.connect(self._host, int(self._port), int(self.Config["keepalive"])) self._dispatcher = MQTTMessageDispatcher() self._client.on_connect = self.on_connect self._client.on_message = self._dispatcher.dispatch self.Connected = False self._client.loop_start()
[docs] def on_connect(self, client, userdata, flags, rc, properties=None): L.info(f"Connected to MQTT broker: {self._host}:{self._port}") self.Connected = True
[docs] def register_handler(self, topic, handler): self._dispatcher.register(topic, handler)
[docs] def subscribe_topic(self, topic, qos=0): self._client.subscribe(topic, qos)
[docs] def publish_to_topic(self, topic, payload, qos=0, retain=False): self._client.publish(topic, payload, qos, retain)
class MQTTMessageDispatcher: def __init__(self): self.handlers = {} def register(self, topic, handler): self.handlers[topic] = handler def dispatch(self, client, userdata, message): topic = message.topic if topic in self.handlers: self.handlers[topic](client, userdata, message) else: L.warning(f"No handler for topic {topic}")