bspump.mqtt¶
MQTT integration for BSPump.
- class bspump.mqtt.MQTTConnection(app, id=None, config=None)[source]¶
Bases:
Connection- ConfigDefaults: dict = {'broker': 'localhost:1883', 'keepalive': 60, 'password': '', 'username': ''}¶
- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
- class bspump.mqtt.MQTTSource(app, pipeline, connection, id=None, config=None)[source]¶
Bases:
Source- __init__(app, pipeline, connection, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- class bspump.mqtt.MQTTSink(app, pipeline, connection, id=None, config=None)[source]¶
Bases:
Sink
- class bspump.mqtt.MQTTService(app, service_name='bspump.MQTTService', connection=None)[source]¶
Bases:
Service
MQTTConnection¶
Connection to MQTT broker.
import bspump.mqtt
connection = bspump.mqtt.MQTTConnection(app, "MQTTConnection")
Configuration:
[connection:MQTTConnection]
host=localhost
port=1883
username=user
password=${MQTT_PASSWORD}
Options:
host- MQTT broker hostport- MQTT broker port (default: 1883)username- Authentication usernamepassword- Authentication passwordtls- Enable TLS (true/false)ca_certs- Path to CA certificate
MQTTSource¶
Subscribes to MQTT topics.
source = bspump.mqtt.MQTTSource(
app, pipeline,
connection="MQTTConnection"
)
Configuration:
[pipeline:MyPipeline:MQTTSource]
topic=sensors/#
qos=1
Options:
topic- Topic to subscribe totopics- Comma-separated list of topicsqos- Quality of Service (0, 1, 2)
Wildcards:
+- Single level:sensors/+/temperature#- Multi level:sensors/#
MQTTSink¶
Publishes messages to MQTT topics.
sink = bspump.mqtt.MQTTSink(
app, pipeline,
connection="MQTTConnection"
)
Configuration:
[pipeline:MyPipeline:MQTTSink]
topic=processed/sensors
qos=1
retain=false
Options:
topic- Topic to publish toqos- Quality of Service (0, 1, 2)retain- Retain messages (true/false)
Dynamic Topic¶
class TopicRouter(bspump.Processor):
def process(self, context, event):
context["mqtt_topic"] = f"sensors/{event['sensor_id']}"
return event
Retained Messages¶
class RetainProcessor(bspump.Processor):
def process(self, context, event):
context["mqtt_retain"] = True
return event
Example Pipeline¶
import bspump
import bspump.mqtt
class MQTTProcessingPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.mqtt.MQTTSource(app, self, connection="MQTTConnection"),
ParseProcessor(app, self),
bspump.mqtt.MQTTSink(app, self, connection="MQTTConnection"),
)