Source code for bspump.trigger.opportunistic
from .trigger import Trigger
[docs]
class OpportunisticTrigger(Trigger):
"""
This trigger tries to trigger the pump as frequenty as possible.
It triggers immediately when possible, after each Source report completed cycle and in 5 sec. period (see chilldown period)
"""
[docs]
def __init__(self, app, id=None, run_immediately=True, chilldown_period=5):
super().__init__(app, id=id)
self.ChilldownPeriod = chilldown_period # Seconds
app.PubSub.subscribe("Application.tick!", self.on_tick)
if run_immediately:
self.Loop.call_soon(self.on_tick)
[docs]
def pause(self, pause=True):
super().pause(pause)
if not self.Paused:
self.Loop.call_soon(self.on_tick)
[docs]
def on_tick(self, event_type="simulated"):
now = self.Loop.time()
if (self.LastFireAt != 0) and (now < (self.LastFireAt + self.ChilldownPeriod)):
return
self.fire()
[docs]
def done(self, trigger_source):
self.Loop.call_soon(self.on_tick)
[docs]
@classmethod
def construct(cls, app, definition: dict):
id = definition.get("id")
run_immediately = definition.get("args", {}).get("run_immediately", True)
chilldown_period = definition.get("args", {}).get("chilldown_period", 5)
return cls(
app,
id=id,
run_immediately=run_immediately,
chilldown_period=chilldown_period,
)