Source code for bspump.trigger.pubsub

from .trigger import Trigger


[docs] class PubSubTrigger(Trigger):
[docs] def __init__(self, app, message_types, pubsub=None, id=None): super().__init__(app, id) self.PubSub = pubsub if pubsub is not None else app.PubSub if isinstance(message_types, str): self.PubSub.subscribe(message_types, self.on_message) else: for message_type in message_types: self.PubSub.subscribe(message_type, self.on_message)
[docs] async def on_message(self, message_type): self.fire()
[docs] @classmethod def construct(cls, app, definition: dict): id = definition.get("id") message_types = definition.get("args", {}).get("message_types") return cls(app, id=id, message_types=message_types)