Source code for bspump.trigger.trigger

import abc
import random


[docs] class Trigger(abc.ABC): """ Abstract base class for all trigger types in BitSwan. Triggers are components that fire events at specific times or under specific conditions. When a trigger fires, it notifies all attached sources to execute their cycle() method. Key features: - Manages a set of attached sources - Handles firing triggers (notifying sources) - Supports pause/resume functionality - Tracks trigger state and timing - Supports maximum triggered source limits Usage: trigger = SomeTrigger(app) source = MyTriggerSource(app, pipeline) source.on(trigger) # Attach source to trigger """
[docs] def __init__(self, app, max_triggered=None, id=None): self.Id = id if id is not None else self.__class__.__name__ self.Sources = set() self.Paused = False self.LastFireAt = 0 self.Loop = app.Loop self._max_triggered = max_triggered
[docs] def add(self, source): self.Sources.add(source)
[docs] def remove(self, source): self.Sources.remove(source)
[docs] def fire(self): if self.Paused: return self.LastFireAt = self.Loop.time() if self._max_triggered is None: for source in self.Sources: source.TriggerEvent.set() else: # Maximum number of triggered event is defined triggered = [] untriggered = [] for source in self.Sources: if source.TriggerEvent.is_set(): triggered.append(source) else: untriggered.append(source) to_trigger = self._max_triggered - len(triggered) random.shuffle(untriggered) while to_trigger > 0: if len(untriggered) == 0: break source = untriggered.pop() source.TriggerEvent.set() to_trigger -= 1
[docs] def done(self, trigger_source): """ Called by TriggerSource when cycle is completed. """ pass
[docs] def pause(self, pause=True): self.Paused = pause for source in self.Sources: source.Pipeline.PubSub.publish( "bspump.trigger.pause!", pause=pause, asynchronously=True )