MQTT Integration¶
BSPump provides MQTT integration through bspump.mqtt.
Installation¶
pip install aiomqtt
Components¶
MQTTConnection: Shared connection to MQTT broker
MQTTSource: Subscribes to MQTT topics
MQTTSink: Publishes to MQTT topics
MQTTConnection¶
import bspump.mqtt
connection = bspump.mqtt.MQTTConnection(app, "MQTTConnection")
Configuration:
[connection:MQTTConnection]
host=localhost
port=1883
# Authentication (optional)
username=user
password=${MQTT_PASSWORD}
# TLS (optional)
# tls=true
# ca_certs=/path/to/ca.pem
MQTTSource¶
Subscribe to MQTT topics.
import bspump.mqtt
source = bspump.mqtt.MQTTSource(
app, pipeline,
connection="MQTTConnection"
)
Configuration:
[pipeline:MyPipeline:MQTTSource]
topic=sensors/#
# Subscribe to multiple topics
# topics=sensors/temperature,sensors/humidity
MQTTSink¶
Publish messages to MQTT topics.
import bspump.mqtt
sink = bspump.mqtt.MQTTSink(
app, pipeline,
connection="MQTTConnection"
)
Configuration:
[pipeline:MyPipeline:MQTTSink]
topic=processed/sensors
qos=1
Complete Example¶
from bspump.jupyter import *
import bspump.mqtt
import json
@register_connection
def mqtt_connection(app):
return bspump.mqtt.MQTTConnection(app, "MQTTConnection")
auto_pipeline(
source=lambda app, pipeline: bspump.mqtt.MQTTSource(
app, pipeline, connection="MQTTConnection"
),
sink=lambda app, pipeline: bspump.mqtt.MQTTSink(
app, pipeline, connection="MQTTConnection"
),
name="MQTTProcessingPipeline",
)
# Process MQTT message
data = json.loads(event.decode("utf-8"))
data["processed"] = True
event = json.dumps(data).encode("utf-8")
Topic Wildcards¶
MQTT supports wildcard subscriptions:
+matches a single level:sensors/+/temperature#matches multiple levels:sensors/#
[pipeline:MyPipeline:MQTTSource]
# Match all sensor topics
topic=sensors/#
Dynamic Topic Routing¶
Route messages to different topics:
class TopicRouter(bspump.Processor):
def process(self, context, event):
sensor_type = event.get("sensor_type", "unknown")
context["mqtt_topic"] = f"processed/{sensor_type}"
return event
QoS Levels¶
MQTT supports three Quality of Service levels:
QoS 0: At most once (fire and forget)
QoS 1: At least once (acknowledged delivery)
QoS 2: Exactly once (guaranteed delivery)
[pipeline:MyPipeline:MQTTSink]
topic=alerts
qos=2 # Exactly once for critical messages
Retained Messages¶
Publish retained messages that persist on the broker:
class RetainProcessor(bspump.Processor):
def process(self, context, event):
context["mqtt_retain"] = True
return event
MQTT Pipeline Inspection¶
BSPump supports MQTT-based pipeline inspection. See MQTT Pipeline Inspection & Operation Protocol for details on using MQTT topics to inspect running pipelines.
Configuration Reference¶
Connection Options
Option |
Default |
Description |
|---|---|---|
host |
localhost |
MQTT broker host |
port |
1883 |
MQTT broker port |
username |
(empty) |
Authentication username |
password |
(empty) |
Authentication password |
tls |
false |
Enable TLS |
Source Options
Option |
Default |
Description |
|---|---|---|
topic |
(required) |
Topic to subscribe to |
qos |
0 |
Subscription QoS level |
Sink Options
Option |
Default |
Description |
|---|---|---|
topic |
(required) |
Topic to publish to |
qos |
0 |
Publish QoS level |
retain |
false |
Retain messages |