Source code for bspump.trigger.periodic

import bspump.asab
import asyncio
from .trigger import Trigger


[docs] class PeriodicTrigger(Trigger):
[docs] def __init__(self, app, interval=None, id=None): """ Interval is in seconds, can be a float or int. """ super().__init__(app, id) self.Timer = bspump.asab.Timer(app, self.on_timer, autorestart=True) self.Timer.start(interval) async def run_on_ready(): for pipeline in app.PumpService.Pipelines.values(): while not pipeline.is_ready(): await asyncio.sleep(0.1) await self.Timer.Handler() asyncio.ensure_future(run_on_ready())
[docs] async def on_timer(self): self.fire()
[docs] @classmethod def construct(cls, app, definition: dict): id = definition.get("id") interval = definition.get("args", {}).get("interval", 1.0) return cls(app, id=id, interval=interval)