MQTT Integration

BSPump provides MQTT integration through bspump.mqtt.

Installation

pip install aiomqtt

Components

  • MQTTConnection: Shared connection to MQTT broker

  • MQTTSource: Subscribes to MQTT topics

  • MQTTSink: Publishes to MQTT topics

MQTTConnection

import bspump.mqtt

connection = bspump.mqtt.MQTTConnection(app, "MQTTConnection")

Configuration:

[connection:MQTTConnection]
host=localhost
port=1883

# Authentication (optional)
username=user
password=${MQTT_PASSWORD}

# TLS (optional)
# tls=true
# ca_certs=/path/to/ca.pem

MQTTSource

Subscribe to MQTT topics.

import bspump.mqtt

source = bspump.mqtt.MQTTSource(
    app, pipeline,
    connection="MQTTConnection"
)

Configuration:

[pipeline:MyPipeline:MQTTSource]
topic=sensors/#
# Subscribe to multiple topics
# topics=sensors/temperature,sensors/humidity

MQTTSink

Publish messages to MQTT topics.

import bspump.mqtt

sink = bspump.mqtt.MQTTSink(
    app, pipeline,
    connection="MQTTConnection"
)

Configuration:

[pipeline:MyPipeline:MQTTSink]
topic=processed/sensors
qos=1

Complete Example

from bspump.jupyter import *
import bspump.mqtt
import json

@register_connection
def mqtt_connection(app):
    return bspump.mqtt.MQTTConnection(app, "MQTTConnection")

auto_pipeline(
    source=lambda app, pipeline: bspump.mqtt.MQTTSource(
        app, pipeline, connection="MQTTConnection"
    ),
    sink=lambda app, pipeline: bspump.mqtt.MQTTSink(
        app, pipeline, connection="MQTTConnection"
    ),
    name="MQTTProcessingPipeline",
)

# Process MQTT message
data = json.loads(event.decode("utf-8"))
data["processed"] = True
event = json.dumps(data).encode("utf-8")

Topic Wildcards

MQTT supports wildcard subscriptions:

  • + matches a single level: sensors/+/temperature

  • # matches multiple levels: sensors/#

[pipeline:MyPipeline:MQTTSource]
# Match all sensor topics
topic=sensors/#

Dynamic Topic Routing

Route messages to different topics:

class TopicRouter(bspump.Processor):
    def process(self, context, event):
        sensor_type = event.get("sensor_type", "unknown")
        context["mqtt_topic"] = f"processed/{sensor_type}"
        return event

QoS Levels

MQTT supports three Quality of Service levels:

  • QoS 0: At most once (fire and forget)

  • QoS 1: At least once (acknowledged delivery)

  • QoS 2: Exactly once (guaranteed delivery)

[pipeline:MyPipeline:MQTTSink]
topic=alerts
qos=2  # Exactly once for critical messages

Retained Messages

Publish retained messages that persist on the broker:

class RetainProcessor(bspump.Processor):
    def process(self, context, event):
        context["mqtt_retain"] = True
        return event

MQTT Pipeline Inspection

BSPump supports MQTT-based pipeline inspection. See MQTT Pipeline Inspection & Operation Protocol for details on using MQTT topics to inspect running pipelines.

Configuration Reference

Connection Options

Option

Default

Description

host

localhost

MQTT broker host

port

1883

MQTT broker port

username

(empty)

Authentication username

password

(empty)

Authentication password

tls

false

Enable TLS

Source Options

Option

Default

Description

topic

(required)

Topic to subscribe to

qos

0

Subscription QoS level

Sink Options

Option

Default

Description

topic

(required)

Topic to publish to

qos

0

Publish QoS level

retain

false

Retain messages