Source code for bspump.mqtt.source

import asyncio
import logging

from ..abc.source import Source

L = logging.getLogger(__name__)


[docs] class MQTTSource(Source): ConfigDefaults = {"topic": "#", "qos": 0}
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.Connection.subscribe_topic(self.Config["topic"], self.Config["qos"]) self.Connection.register_handler(self.Config["topic"], self.on_message) self._queue = asyncio.Queue()
[docs] async def main(self): try: while True: await self.Pipeline.ready() event = await self._queue.get() await self.process(event) except asyncio.CancelledError: pass except BaseException as e: L.exception("Error when processing message.") self.Pipeline.set_error(None, None, e)
[docs] def on_message(self, client, userdata, message): self._queue.put_nowait(message.payload)