bspump.trigger¶
Triggers for controlling when TriggerSources produce events.
- class bspump.trigger.Trigger(app, max_triggered=None, id=None)[source]¶
Bases:
ABCAbstract base class for all trigger types in BitSwan.
Triggers are components that fire events at specific times or under specific conditions. When a trigger fires, it notifies all attached sources to execute their cycle() method.
Key features: - Manages a set of attached sources - Handles firing triggers (notifying sources) - Supports pause/resume functionality - Tracks trigger state and timing - Supports maximum triggered source limits
- Usage:
trigger = SomeTrigger(app) source = MyTriggerSource(app, pipeline) source.on(trigger) # Attach source to trigger
- class bspump.trigger.OpportunisticTrigger(app, id=None, run_immediately=True, chilldown_period=5)[source]¶
Bases:
TriggerThis trigger tries to trigger the pump as frequenty as possible. It triggers immediately when possible, after each Source report completed cycle and in 5 sec. period (see chilldown period)
- class bspump.trigger.RunOnceTrigger(app, id=None)[source]¶
Bases:
TriggerRunOnceTrigger is an obsolete trigger that should not be used.
The following issues should be taken into consideration and need to be refactored in the future: 1) RunOnceTrigger issues the application stop. 2) The self.fire() event is triggered before the pipeline is_ready()
- class bspump.trigger.PubSubTrigger(app, message_types, pubsub=None, id=None)[source]¶
Bases:
Trigger
- class bspump.trigger.CronTrigger(app, cron_string, init_time, id=None)[source]¶
Bases:
TriggerTrigger that uses crontab syntax for specifying execution frequency.
CronTrigger allows you to schedule pipeline execution using standard cron expressions. It calculates the next execution time based on the cron string and fires when that time is reached.
- Cron syntax:
- | | | || | | +– Day of week (0-7, 0 and 7 are Sunday)| | +—- Month (1-12)| +—— Day of month (1-31)+——– Hour (0-23)
+———- Minute (0-59)
Examples
“0 9 * * “ # Daily at 9:00 AM “/15 * * * *” # Every 15 minutes “0 0 1 * *” # First day of every month at midnight “0 9 * * 1” # Every Monday at 9:00 AM
Warning: Always use timezone-aware datetime objects for init_time to avoid timezone-related issues.
CronTrigger¶
Schedule-based triggers using cron expressions.
from bspump.trigger import CronTrigger
trigger = CronTrigger(app, "*/5 * * * *") # Every 5 minutes
Cron Expression Format:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6, Sunday = 0)
│ │ │ │ │
* * * * *
Examples:
CronTrigger(app, "*/10 * * * *") # Every 10 minutes
CronTrigger(app, "0 * * * *") # Every hour
CronTrigger(app, "0 0 * * *") # Daily at midnight
CronTrigger(app, "0 9 * * 1") # Monday at 9 AM
CronTrigger(app, "0 0 1 * *") # First of each month
CronTrigger(app, "0 18 * * 1-5") # Weekdays at 6 PM
PubSubTrigger¶
Event-driven triggers using internal pub/sub.
from bspump.trigger import PubSubTrigger
trigger = PubSubTrigger(app, "my.event.topic")
# Fire the trigger from elsewhere
app.PubSub.publish("my.event.topic", data={"key": "value"})
PeriodicTrigger¶
Simple interval-based triggers.
from bspump.trigger import PeriodicTrigger
trigger = PeriodicTrigger(app, 10) # Every 10 seconds
OpportunisticTrigger¶
Fires when the application has idle time.
from bspump.trigger import OpportunisticTrigger
trigger = OpportunisticTrigger(app)
Using Triggers¶
Attach triggers to a TriggerSource:
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
class MySource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
event = {"timestamp": datetime.now().isoformat()}
await self.Pipeline.process(event)
source = MySource(app, pipeline).on(
CronTrigger(app, "*/5 * * * *")
)
Multiple Triggers¶
A source can have multiple triggers:
source = MySource(app, pipeline).on(
CronTrigger(app, "0 * * * *"), # Every hour
PubSubTrigger(app, "force.run") # On-demand
)
The source’s cycle() method runs when any trigger fires.
Custom Triggers¶
Create custom triggers:
from bspump.trigger import Trigger
class WebhookTrigger(Trigger):
def __init__(self, app, path):
super().__init__(app)
self.path = path
# Set up webhook handler
async def handle_webhook(self, request):
await self.fire()