Source code for bspump.mqtt.service

import re
import json
import logging
import time
from bspump.asab import Service, Config
from bspump.asab.web.rest.json import JSONDumper

L = logging.getLogger(__name__)


def get_pipeline_topology(pump, pipeline):
    dumper = JSONDumper(pretty=False)
    pipelines = json.loads(dumper(pump.Pipelines))

    pipeline_obj = pump.Pipelines[pipeline]

    pipeline_data = pipelines[pipeline]
    components = []
    metrics_components = []
    components.append(pipeline_data["Sources"][0])
    # get metrics
    for metric in pipeline_data["Metrics"]:
        if (
            metric["type"] == "Counter"
            and metric["name"] == "bspump.pipeline.eps_processor"
        ):
            metrics_components.append(metric)
    for depth in pipeline_data["Processors"]:
        for component in depth:
            components.append(component)

    output = {
        "topology": {},
        "display-style": "graph",
        "display-priority": "shown",
    }
    for i in range(len(components)):
        component_data = {}

        component_obj = pipeline_obj.locate_source(components[i]["Id"])
        if component_obj is None:
            component_obj = pipeline_obj.locate_processor(components[i]["Id"])

        # If it's not the last element, add the "wires" key
        if i < len(components) - 1:
            component_data["wires"] = [components[i + 1]["Id"]]
        else:
            component_data["wires"] = []

        # Implement getting properties
        component_data["properties"] = {
            key: value for key, value in component_obj.Config.items()
        }
        component_data["capabilities"] = ["subscribable-events"]

        for metric in metrics_components:
            if metric["static_tags"]["processor"] == components[i]["Id"]:
                component_data["metrics"] = {
                    "eps.in": metric["fieldset"][0]["values"]["eps.in"],
                    "eps.out": metric["fieldset"][0]["values"]["eps.out"],
                }
                break
        output["topology"][components[i]["Id"]] = component_data

    # add metrics to the source as it doesn't have any metrics from the metric service
    if "metrics" not in output["topology"][components[0]["Id"]]:
        if "eps.out" in output["topology"][components[1]["Id"]]["metrics"]:
            output["topology"][components[0]["Id"]]["metrics"] = {}
            output["topology"][components[0]["Id"]]["metrics"]["eps.out"] = int(
                output["topology"][components[1]["Id"]]["metrics"]["eps.out"]
            )

    message = get_message_structure()
    message["data"] = output
    message["count"] = 1
    message["remaining_subscription_count"] = 1

    return message


def get_pipelines(pipelines: dict):
    output = {"topology": {}, "display-style": "graph", "display-priority": "hidden"}
    for pipeline in pipelines.values():
        pipeline_dict = {
            "wires": [],
            "properties": [],
            "metrics": [],
            "capabilities": ["has-children"],
        }
        output["topology"][pipeline.Id] = pipeline_dict

    message = get_message_structure()
    message["data"] = output
    message["count"] = 1
    message["remaining_subscription_count"] = 1

    return message


def get_message_structure():
    return {
        "timestamp": time.time_ns(),
        "data": {},
        "count": 0,
        "remaining_subscription_count": 0,
    }


[docs] class MQTTService(Service):
[docs] def __init__(self, app, service_name="bspump.MQTTService", connection=None): super().__init__(app, service_name) self.App = app self.dumper = JSONDumper(pretty=False) self.Connection = None self.ConnectionId = connection try: self.max_count = int(Config["mqtt"].get("max_count")) except Exception: L.warning( "MQTTService: The max_count parameter is not set in the configuration file or is not an integer. Default value is 100." ) self.max_count = 100
[docs] def components_initialize(self): svc = self.App.get_service("bspump.PumpService") self.Connection = svc.locate_connection(self.ConnectionId) for pipeline in svc.Pipelines.values(): self.Connection.publish_to_topic( f"/c/{pipeline.Id}/topology", json.dumps(get_pipeline_topology(svc, pipeline.Id)), retain=True, ) for depth in pipeline.Processors: for component in depth: pipeline.PublishingProcessors[component.Id] = 0 self.Connection.subscribe_topic( f"/c/{pipeline.Id}/c/{component.Id}/events/subscribe" ) self.Connection.register_handler( f"/c/{pipeline.Id}/c/{component.Id}/events/subscribe", self.on_message, ) for source in pipeline.Sources: self.Connection.subscribe_topic( f"/c/{pipeline.Id}/c/{source.Id}/events/subscribe" ) self.Connection.register_handler( f"/c/{pipeline.Id}/c/{source.Id}/events/subscribe", self.on_message, ) self.Connection.publish_to_topic( "/topology", json.dumps(get_pipelines(svc.Pipelines)), retain=True, )
[docs] def on_message(self, client, userdata, message): payload = message.payload.decode("utf-8") svc = self.App.get_service("bspump.PumpService") topic = message.topic # Regex patterns events_pattern = r"^/c/(?P<pipeline_identifier>[^/]+)/c/(?P<component_identifier>[^/]+)/events/subscribe$" # Matching events = re.match(events_pattern, topic) # Get list of pipelines from application try: payload = json.loads(payload) except json.decoder.JSONDecodeError: payload = message.payload.decode("utf-8") L.warning( f"Payload sent to {topic} is not a valid JSON. Payload: {payload}" ) return count = payload.get("count") if count is None or count < 0: return count = min(count, self.max_count) if events: pipeline = events.group("pipeline_identifier") processor = events.group("component_identifier") pipeline = svc.locate(f"{pipeline}") if pipeline is None: L.warning(f"Pipeline {pipeline} not found") return source = pipeline.locate_source(f"{processor}") if source is not None: source.EventsToPublish = count else: processor = pipeline.locate_processor(f"{processor}") if processor is None: L.warning(f"Processor {processor} not found") return pipeline.PublishingProcessors[processor.Id] = max( count, pipeline.PublishingProcessors[processor.Id] )
[docs] def publish_event(self, pipeline, component, event, count_remaining): data = get_message_structure() if isinstance(event, bytes): event = event.decode("utf-8") data["data"] = event data["count"] = component.EventCount data["remaining_subscription_count"] = count_remaining self.Connection.publish_to_topic( f"/c/{pipeline}/c/{component.Id}/events", json.dumps(data, default=lambda x: x.__class__.__name__), )