bspump.mqtt

MQTT integration for BSPump.

class bspump.mqtt.MQTTConnection(app, id=None, config=None)[source]

Bases: Connection

ConfigDefaults: dict = {'broker': 'localhost:1883', 'keepalive': 60, 'password': '', 'username': ''}
__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

on_connect(client, userdata, flags, rc, properties=None)[source]
register_handler(topic, handler)[source]
subscribe_topic(topic, qos=0)[source]
publish_to_topic(topic, payload, qos=0, retain=False)[source]
class bspump.mqtt.MQTTSource(app, pipeline, connection, id=None, config=None)[source]

Bases: Source

ConfigDefaults: dict = {'qos': 0, 'topic': '#'}
__init__(app, pipeline, connection, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

on_message(client, userdata, message)[source]
class bspump.mqtt.MQTTSink(app, pipeline, connection, id=None, config=None)[source]

Bases: Sink

ConfigDefaults: dict = {'qos': 0, 'retain': False, 'topic': 'default'}
__init__(app, pipeline, connection, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

process(context, event)[source]

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

class bspump.mqtt.MQTTService(app, service_name='bspump.MQTTService', connection=None)[source]

Bases: Service

__init__(app, service_name='bspump.MQTTService', connection=None)[source]

Register the service to asab.Application.Services dictionary with the provided service_name.

Parameters:
  • app (asab.Application) – Reference to ASAB application.

  • service_name – Reference name of the Service.

components_initialize()[source]
on_message(client, userdata, message)[source]
publish_event(pipeline, component, event, count_remaining)[source]

MQTTConnection

Connection to MQTT broker.

import bspump.mqtt

connection = bspump.mqtt.MQTTConnection(app, "MQTTConnection")

Configuration:

[connection:MQTTConnection]
host=localhost
port=1883
username=user
password=${MQTT_PASSWORD}

Options:

  • host - MQTT broker host

  • port - MQTT broker port (default: 1883)

  • username - Authentication username

  • password - Authentication password

  • tls - Enable TLS (true/false)

  • ca_certs - Path to CA certificate

MQTTSource

Subscribes to MQTT topics.

source = bspump.mqtt.MQTTSource(
    app, pipeline,
    connection="MQTTConnection"
)

Configuration:

[pipeline:MyPipeline:MQTTSource]
topic=sensors/#
qos=1

Options:

  • topic - Topic to subscribe to

  • topics - Comma-separated list of topics

  • qos - Quality of Service (0, 1, 2)

Wildcards:

  • + - Single level: sensors/+/temperature

  • # - Multi level: sensors/#

MQTTSink

Publishes messages to MQTT topics.

sink = bspump.mqtt.MQTTSink(
    app, pipeline,
    connection="MQTTConnection"
)

Configuration:

[pipeline:MyPipeline:MQTTSink]
topic=processed/sensors
qos=1
retain=false

Options:

  • topic - Topic to publish to

  • qos - Quality of Service (0, 1, 2)

  • retain - Retain messages (true/false)

Dynamic Topic

class TopicRouter(bspump.Processor):
    def process(self, context, event):
        context["mqtt_topic"] = f"sensors/{event['sensor_id']}"
        return event

Retained Messages

class RetainProcessor(bspump.Processor):
    def process(self, context, event):
        context["mqtt_retain"] = True
        return event

Example Pipeline

import bspump
import bspump.mqtt

class MQTTProcessingPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.mqtt.MQTTSource(app, self, connection="MQTTConnection"),
            ParseProcessor(app, self),
            bspump.mqtt.MQTTSink(app, self, connection="MQTTConnection"),
        )