MQTT Pipeline Inspection & Operation Protocol¶
The MPIOP provides a set of MQTT topics which can be used to inspect (and in the future operate) pipelines.
Getting Pipeline Topology¶
List Pipelines in a Container¶
Send get as the payload to:
c/{container_id}/topology/get
You will receive the topology by subscribing to:
c/{container_id}/topology
List Components in a Pipeline¶
Send get as the payload to:
c/{container_id}/c/{pipeline_id}/topology/get
You will receive the topology by subscribing to:
c/{container_id}/c/{pipeline_id}/topology
Subscribing to Events¶
To subscribe to events flowing through a given component, send:
{
"event_count": 200
}
To:
c/{container_id}/c/{pipeline_id}/events/subscribe
The next 200 events to flow out of that component will be sent to:
c/{container_id}/c/{pipeline_id}/events
Use Cases¶
Debugging¶
Monitor events flowing through specific pipeline components to debug processing issues.
Monitoring¶
Track event throughput and identify bottlenecks in real-time.
Testing¶
Subscribe to events during development to validate pipeline behavior.
Example: Python MQTT Client¶
import paho.mqtt.client as mqtt
import json
def on_message(client, userdata, message):
payload = message.payload.decode()
print(f"Topic: {message.topic}")
print(f"Payload: {payload}")
client = mqtt.Client()
client.on_message = on_message
client.connect("mqtt-broker", 1883)
# Subscribe to topology
container_id = "my-container"
client.subscribe(f"c/{container_id}/topology")
# Request topology
client.publish(f"c/{container_id}/topology/get", "get")
client.loop_forever()
Configuration¶
Enable MQTT visibility in your BSPump application:
[mqtt:visibility]
enabled=true
broker=localhost
port=1883
Security Considerations¶
Authentication: Use MQTT authentication to restrict access
Authorization: Limit which topics users can subscribe to
Encryption: Use TLS for MQTT connections in production
Event filtering: Be cautious about exposing sensitive data